1use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::{RwLock, Semaphore};
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12
13use crate::{Result, VectorError};
14
15#[derive(Debug, Clone)]
17#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
18pub struct ConnectionPoolConfig {
19 pub max_connections: usize,
21 pub min_connections: usize,
23 pub connection_timeout: Duration,
25 pub idle_timeout: Duration,
27 pub max_retries: u32,
29}
30
31impl Default for ConnectionPoolConfig {
32 fn default() -> Self {
33 Self {
34 max_connections: 10,
35 min_connections: 2,
36 connection_timeout: Duration::from_secs(30),
37 idle_timeout: Duration::from_secs(300),
38 max_retries: 3,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
46pub struct CacheConfig {
47 pub max_entries: usize,
49 pub ttl: Duration,
51 pub enable_lru: bool,
53 pub stats_interval: Duration,
55}
56
57impl Default for CacheConfig {
58 fn default() -> Self {
59 Self {
60 max_entries: 1000,
61 ttl: Duration::from_secs(3600),
62 enable_lru: true,
63 stats_interval: Duration::from_secs(60),
64 }
65 }
66}
67
68pub struct ConnectionPool<T> {
70 config: ConnectionPoolConfig,
71 connections: Arc<RwLock<Vec<PooledConnection<T>>>>,
72 semaphore: Arc<Semaphore>,
73 stats: Arc<RwLock<ConnectionPoolStats>>,
74}
75
76pub struct PooledConnection<T> {
78 connection: T,
79 created_at: Instant,
80 last_used: Instant,
81 is_active: bool,
82}
83
84#[derive(Debug, Default, Clone)]
86pub struct ConnectionPoolStats {
87 pub total_connections: usize,
88 pub active_connections: usize,
89 pub idle_connections: usize,
90 pub total_requests: u64,
91 pub successful_requests: u64,
92 pub failed_requests: u64,
93 pub average_wait_time: Duration,
94}
95
96impl<T> ConnectionPool<T> {
97 pub fn new(config: ConnectionPoolConfig) -> Self {
99 let semaphore = Arc::new(Semaphore::new(config.max_connections));
100
101 Self {
102 config,
103 connections: Arc::new(RwLock::new(Vec::new())),
104 semaphore,
105 stats: Arc::new(RwLock::new(ConnectionPoolStats::default())),
106 }
107 }
108
109 pub async fn get_connection(&self) -> Result<PooledConnection<T>>
111 where
112 T: Clone,
113 {
114 let start_time = Instant::now();
115
116 let _permit = self.semaphore.acquire().await
118 .map_err(|e| VectorError::ConnectionFailed(format!("Failed to acquire connection: {}", e)))?;
119
120 let mut connections = self.connections.write().await;
121
122 if let Some(pos) = connections.iter().position(|conn| !conn.is_active) {
124 let mut conn = connections.remove(pos);
125 conn.is_active = true;
126 conn.last_used = Instant::now();
127
128 let mut stats = self.stats.write().await;
130 stats.total_requests += 1;
131 stats.successful_requests += 1;
132 stats.average_wait_time = start_time.elapsed();
133
134 return Ok(conn);
135 }
136
137 if connections.len() < self.config.max_connections {
139 return Err(VectorError::ConnectionFailed("Connection creation not implemented".to_string()));
141 }
142
143 Err(VectorError::ConnectionFailed("No available connections".to_string()))
144 }
145
146 pub async fn return_connection(&self, mut connection: PooledConnection<T>) {
148 connection.is_active = false;
149 connection.last_used = Instant::now();
150
151 let mut connections = self.connections.write().await;
152 connections.push(connection);
153 }
154
155 pub async fn get_stats(&self) -> ConnectionPoolStats {
157 self.stats.read().await.clone()
158 }
159
160 pub async fn cleanup_expired_connections(&self) {
162 let mut connections = self.connections.write().await;
163 let now = Instant::now();
164
165 connections.retain(|conn| {
166 !conn.is_active && now.duration_since(conn.last_used) < self.config.idle_timeout
167 });
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct CacheEntry<T> {
174 pub value: T,
175 pub created_at: Instant,
176 pub last_accessed: Instant,
177 pub access_count: u64,
178}
179
180pub struct LRUCache<K, V> {
182 config: CacheConfig,
183 entries: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
184 stats: Arc<RwLock<CacheStats>>,
185}
186
187#[derive(Debug, Default, Clone)]
189pub struct CacheStats {
190 pub total_requests: u64,
191 pub cache_hits: u64,
192 pub cache_misses: u64,
193 pub evictions: u64,
194 pub current_size: usize,
195 pub hit_rate: f64,
196}
197
198impl<K, V> LRUCache<K, V>
199where
200 K: std::hash::Hash + Eq + Clone,
201 V: Clone,
202{
203 pub fn new(config: CacheConfig) -> Self {
205 Self {
206 config,
207 entries: Arc::new(RwLock::new(HashMap::new())),
208 stats: Arc::new(RwLock::new(CacheStats::default())),
209 }
210 }
211
212 pub async fn get(&self, key: &K) -> Option<V> {
214 let mut stats = self.stats.write().await;
215 stats.total_requests += 1;
216
217 let mut entries = self.entries.write().await;
218
219 if let Some(entry) = entries.get_mut(key) {
220 let now = Instant::now();
221
222 if now.duration_since(entry.created_at) > self.config.ttl {
224 entries.remove(key);
225 stats.cache_misses += 1;
226 stats.current_size = entries.len();
227 return None;
228 }
229
230 entry.last_accessed = now;
232 entry.access_count += 1;
233
234 stats.cache_hits += 1;
235 stats.hit_rate = stats.cache_hits as f64 / stats.total_requests as f64;
236
237 Some(entry.value.clone())
238 } else {
239 stats.cache_misses += 1;
240 stats.hit_rate = stats.cache_hits as f64 / stats.total_requests as f64;
241 None
242 }
243 }
244
245 pub async fn set(&self, key: K, value: V) {
247 let mut entries = self.entries.write().await;
248 let now = Instant::now();
249
250 if entries.len() >= self.config.max_entries && !entries.contains_key(&key) {
252 self.evict_lru(&mut entries).await;
253 }
254
255 let entry = CacheEntry {
256 value,
257 created_at: now,
258 last_accessed: now,
259 access_count: 1,
260 };
261
262 entries.insert(key, entry);
263
264 let mut stats = self.stats.write().await;
265 stats.current_size = entries.len();
266 }
267
268 pub async fn remove(&self, key: &K) -> Option<V> {
270 let mut entries = self.entries.write().await;
271 let result = entries.remove(key).map(|entry| entry.value);
272
273 let mut stats = self.stats.write().await;
274 stats.current_size = entries.len();
275
276 result
277 }
278
279 pub async fn clear(&self) {
281 let mut entries = self.entries.write().await;
282 entries.clear();
283
284 let mut stats = self.stats.write().await;
285 stats.current_size = 0;
286 }
287
288 pub async fn get_stats(&self) -> CacheStats {
290 self.stats.read().await.clone()
291 }
292
293 async fn evict_lru(&self, entries: &mut HashMap<K, CacheEntry<V>>) {
295 if entries.is_empty() {
296 return;
297 }
298
299 let mut oldest_key = None;
301 let mut oldest_time = Instant::now();
302
303 for (key, entry) in entries.iter() {
304 if entry.last_accessed < oldest_time {
305 oldest_time = entry.last_accessed;
306 oldest_key = Some(key.clone());
307 }
308 }
309
310 if let Some(key) = oldest_key {
311 entries.remove(&key);
312
313 let mut stats = self.stats.write().await;
314 stats.evictions += 1;
315 }
316 }
317}
318
319pub struct PerformanceMonitor {
321 metrics: Arc<RwLock<PerformanceMetrics>>,
322}
323
324#[derive(Debug, Default, Clone)]
326pub struct PerformanceMetrics {
327 pub total_operations: u64,
328 pub successful_operations: u64,
329 pub failed_operations: u64,
330 pub average_response_time: Duration,
331 pub min_response_time: Duration,
332 pub max_response_time: Duration,
333 pub operations_per_second: f64,
334 pub memory_usage_mb: f64,
335 pub cpu_usage_percent: f64,
336}
337
338impl PerformanceMonitor {
339 pub fn new() -> Self {
341 Self {
342 metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
343 }
344 }
345
346 pub async fn record_operation(&self, duration: Duration, success: bool) {
348 let mut metrics = self.metrics.write().await;
349
350 metrics.total_operations += 1;
351
352 if success {
353 metrics.successful_operations += 1;
354 } else {
355 metrics.failed_operations += 1;
356 }
357
358 if metrics.total_operations == 1 {
360 metrics.min_response_time = duration;
361 metrics.max_response_time = duration;
362 metrics.average_response_time = duration;
363 } else {
364 if duration < metrics.min_response_time {
365 metrics.min_response_time = duration;
366 }
367 if duration > metrics.max_response_time {
368 metrics.max_response_time = duration;
369 }
370
371 let total_time = metrics.average_response_time.as_nanos() as f64 * (metrics.total_operations - 1) as f64;
373 metrics.average_response_time = Duration::from_nanos(
374 ((total_time + duration.as_nanos() as f64) / metrics.total_operations as f64) as u64
375 );
376 }
377 }
378
379 pub async fn get_metrics(&self) -> PerformanceMetrics {
381 self.metrics.read().await.clone()
382 }
383
384 pub async fn reset_metrics(&self) {
386 let mut metrics = self.metrics.write().await;
387 *metrics = PerformanceMetrics::default();
388 }
389}