1use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use serde::{Serialize, Deserialize};
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone)]
14pub struct PerformanceConfig {
15 pub enable_connection_pooling: bool,
16 pub max_pool_size: usize,
17 pub enable_message_batching: bool,
18 pub batch_size: usize,
19 pub batch_timeout: Duration,
20 pub enable_caching: bool,
21 pub cache_size: usize,
22 pub cache_ttl: Duration,
23 pub enable_compression: bool,
24 pub compression_threshold: usize,
25 pub enable_metrics: bool,
26}
27
28impl Default for PerformanceConfig {
29 fn default() -> Self {
30 Self {
31 enable_connection_pooling: true,
32 max_pool_size: 10,
33 enable_message_batching: true,
34 batch_size: 100,
35 batch_timeout: Duration::from_millis(10),
36 enable_caching: true,
37 cache_size: 1000,
38 cache_ttl: Duration::from_secs(300),
39 enable_compression: true,
40 compression_threshold: 1024,
41 enable_metrics: true,
42 }
43 }
44}
45
46pub struct PerformanceManager {
48 config: PerformanceConfig,
49 connection_pool: Option<ConnectionPool>,
50 message_batcher: Option<MessageBatcher>,
51 cache: Option<MessageCache>,
52 metrics_collector: Option<MetricsCollector>,
53}
54
55impl PerformanceManager {
56 pub fn new(config: PerformanceConfig) -> Self {
57 let connection_pool = if config.enable_connection_pooling {
58 Some(ConnectionPool::new(config.max_pool_size))
59 } else {
60 None
61 };
62
63 let message_batcher = if config.enable_message_batching {
64 Some(MessageBatcher::new(config.batch_size, config.batch_timeout))
65 } else {
66 None
67 };
68
69 let cache = if config.enable_caching {
70 Some(MessageCache::new(config.cache_size, config.cache_ttl))
71 } else {
72 None
73 };
74
75 let metrics_collector = if config.enable_metrics {
76 Some(MetricsCollector::new())
77 } else {
78 None
79 };
80
81 Self {
82 config,
83 connection_pool,
84 message_batcher,
85 cache,
86 metrics_collector,
87 }
88 }
89
90 pub async fn get_connection(&self, url: &str) -> Result<PooledConnection, PerformanceError> {
92 if let Some(pool) = &self.connection_pool {
93 pool.get_connection(url).await
94 } else {
95 Err(PerformanceError::PoolingDisabled)
96 }
97 }
98
99 pub async fn return_connection(&self, connection: PooledConnection) {
101 if let Some(pool) = &self.connection_pool {
102 pool.return_connection(connection).await;
103 }
104 }
105
106 pub async fn queue_message(&self, message: Vec<u8>) -> Result<(), PerformanceError> {
108 if let Some(batcher) = &self.message_batcher {
109 batcher.add_message(message).await
110 } else {
111 Err(PerformanceError::BatchingDisabled)
112 }
113 }
114
115 pub async fn flush_messages(&self) -> Result<Vec<Vec<u8>>, PerformanceError> {
117 if let Some(batcher) = &self.message_batcher {
118 Ok(batcher.flush_messages().await)
119 } else {
120 Ok(vec![])
121 }
122 }
123
124 pub async fn get_cached(&self, key: &str) -> Option<Vec<u8>> {
126 if let Some(cache) = &self.cache {
127 cache.get(key).await
128 } else {
129 None
130 }
131 }
132
133 pub async fn set_cached(&self, key: String, value: Vec<u8>) {
135 if let Some(cache) = &self.cache {
136 cache.set(key, value).await;
137 }
138 }
139
140 pub fn record_metric(&self, name: &str, value: f64, tags: Option<HashMap<String, String>>) {
142 if let Some(collector) = &self.metrics_collector {
143 collector.record_metric(name, value, tags);
144 }
145 }
146
147 pub fn get_metrics(&self) -> Option<PerformanceMetrics> {
149 self.metrics_collector.as_ref().map(|c| c.get_metrics())
150 }
151
152 pub fn should_compress(&self, message_size: usize) -> bool {
154 self.config.enable_compression && message_size >= self.config.compression_threshold
155 }
156}
157
158pub struct ConnectionPool {
160 max_size: usize,
161 connections: Arc<RwLock<HashMap<String, VecDeque<PooledConnection>>>>,
162 total_connections: Arc<Mutex<usize>>,
163}
164
165impl ConnectionPool {
166 pub fn new(max_size: usize) -> Self {
167 Self {
168 max_size,
169 connections: Arc::new(RwLock::new(HashMap::new())),
170 total_connections: Arc::new(Mutex::new(0)),
171 }
172 }
173
174 pub async fn get_connection(&self, url: &str) -> Result<PooledConnection, PerformanceError> {
175 let mut connections = self.connections.write().await;
176
177 if let Some(pool) = connections.get_mut(url) {
178 if let Some(connection) = pool.pop_front() {
179 return Ok(connection);
180 }
181 }
182
183 let total = *self.total_connections.lock().unwrap();
185 if total < self.max_size {
186 *self.total_connections.lock().unwrap() += 1;
187 Ok(PooledConnection::new(url.to_string()))
188 } else {
189 Err(PerformanceError::PoolExhausted)
190 }
191 }
192
193 pub async fn return_connection(&self, connection: PooledConnection) {
194 if connection.is_healthy() {
195 let mut connections = self.connections.write().await;
196 let pool = connections.entry(connection.url.clone()).or_insert_with(VecDeque::new);
197 pool.push_back(connection);
198 } else {
199 *self.total_connections.lock().unwrap() -= 1;
201 }
202 }
203
204 pub async fn cleanup_idle_connections(&self) {
205 let mut connections = self.connections.write().await;
206 let cutoff = Instant::now() - Duration::from_secs(300); for pool in connections.values_mut() {
209 let original_len = pool.len();
210 pool.retain(|conn| conn.last_used > cutoff);
211 let removed = original_len - pool.len();
212
213 if removed > 0 {
214 *self.total_connections.lock().unwrap() -= removed;
215 }
216 }
217 }
218}
219
220#[derive(Debug, Clone)]
222pub struct PooledConnection {
223 pub url: String,
224 pub created_at: Instant,
225 pub last_used: Instant,
226 pub request_count: u64,
227 pub is_connected: bool,
228}
229
230impl PooledConnection {
231 pub fn new(url: String) -> Self {
232 let now = Instant::now();
233 Self {
234 url,
235 created_at: now,
236 last_used: now,
237 request_count: 0,
238 is_connected: true,
239 }
240 }
241
242 pub fn is_healthy(&self) -> bool {
243 self.is_connected && self.last_used.elapsed() < Duration::from_secs(60)
244 }
245
246 pub fn mark_used(&mut self) {
247 self.last_used = Instant::now();
248 self.request_count += 1;
249 }
250}
251
252pub struct MessageBatcher {
254 batch_size: usize,
255 batch_timeout: Duration,
256 pending_messages: Arc<Mutex<VecDeque<Vec<u8>>>>,
257 last_flush: Arc<Mutex<Instant>>,
258}
259
260impl MessageBatcher {
261 pub fn new(batch_size: usize, batch_timeout: Duration) -> Self {
262 Self {
263 batch_size,
264 batch_timeout,
265 pending_messages: Arc::new(Mutex::new(VecDeque::new())),
266 last_flush: Arc::new(Mutex::new(Instant::now())),
267 }
268 }
269
270 pub async fn add_message(&self, message: Vec<u8>) -> Result<(), PerformanceError> {
271 let mut pending = self.pending_messages.lock().unwrap();
272 pending.push_back(message);
273
274 if pending.len() >= self.batch_size {
276 drop(pending);
277 self.flush_messages().await;
278 }
279
280 Ok(())
281 }
282
283 pub async fn flush_messages(&self) -> Vec<Vec<u8>> {
284 let mut pending = self.pending_messages.lock().unwrap();
285 let messages: Vec<_> = pending.drain(..).collect();
286 *self.last_flush.lock().unwrap() = Instant::now();
287 messages
288 }
289
290 pub fn should_flush(&self) -> bool {
291 let pending = self.pending_messages.lock().unwrap();
292 let last_flush = self.last_flush.lock().unwrap();
293
294 !pending.is_empty() &&
295 (pending.len() >= self.batch_size ||
296 last_flush.elapsed() >= self.batch_timeout)
297 }
298
299 pub fn pending_count(&self) -> usize {
300 self.pending_messages.lock().unwrap().len()
301 }
302}
303
304pub struct MessageCache {
306 cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
307 max_size: usize,
308 ttl: Duration,
309}
310
311impl MessageCache {
312 pub fn new(max_size: usize, ttl: Duration) -> Self {
313 Self {
314 cache: Arc::new(RwLock::new(HashMap::new())),
315 max_size,
316 ttl,
317 }
318 }
319
320 pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
321 let cache = self.cache.read().await;
322
323 if let Some(entry) = cache.get(key) {
324 if entry.expires_at > Instant::now() {
325 Some(entry.value.clone())
326 } else {
327 None }
329 } else {
330 None
331 }
332 }
333
334 pub async fn set(&self, key: String, value: Vec<u8>) {
335 let mut cache = self.cache.write().await;
336
337 if cache.len() >= self.max_size {
339 self.evict_oldest(&mut cache);
340 }
341
342 cache.insert(key, CacheEntry {
343 value,
344 created_at: Instant::now(),
345 expires_at: Instant::now() + self.ttl,
346 access_count: 1,
347 });
348 }
349
350 fn evict_oldest(&self, cache: &mut HashMap<String, CacheEntry>) {
351 if let Some(oldest_key) = cache.iter()
352 .min_by_key(|(_, entry)| entry.created_at)
353 .map(|(key, _)| key.clone())
354 {
355 cache.remove(&oldest_key);
356 }
357 }
358
359 pub async fn cleanup_expired(&self) {
360 let mut cache = self.cache.write().await;
361 let now = Instant::now();
362
363 cache.retain(|_, entry| entry.expires_at > now);
364 }
365
366 pub async fn stats(&self) -> CacheStats {
367 let cache = self.cache.read().await;
368
369 CacheStats {
370 size: cache.len(),
371 capacity: self.max_size,
372 hit_ratio: 0.0, }
374 }
375}
376
377#[derive(Debug, Clone)]
378struct CacheEntry {
379 value: Vec<u8>,
380 created_at: Instant,
381 expires_at: Instant,
382 access_count: u64,
383}
384
385#[derive(Debug, Clone)]
386pub struct CacheStats {
387 pub size: usize,
388 pub capacity: usize,
389 pub hit_ratio: f64,
390}
391
392pub struct MetricsCollector {
394 metrics: Arc<RwLock<HashMap<String, MetricValue>>>,
395 start_time: Instant,
396}
397
398impl MetricsCollector {
399 pub fn new() -> Self {
400 Self {
401 metrics: Arc::new(RwLock::new(HashMap::new())),
402 start_time: Instant::now(),
403 }
404 }
405
406 pub fn record_metric(&self, name: &str, value: f64, tags: Option<HashMap<String, String>>) {
407 let metric = MetricValue {
408 value,
409 timestamp: Instant::now(),
410 tags: tags.unwrap_or_default(),
411 };
412
413 tokio::spawn({
414 let metrics = self.metrics.clone();
415 let name = name.to_string();
416 async move {
417 let mut metrics = metrics.write().await;
418 metrics.insert(name, metric);
419 }
420 });
421 }
422
423 pub fn get_metrics(&self) -> PerformanceMetrics {
424 PerformanceMetrics {
427 uptime: self.start_time.elapsed(),
428 total_requests: 0,
429 requests_per_second: 0.0,
430 average_response_time: Duration::from_millis(0),
431 memory_usage: 0,
432 cpu_usage: 0.0,
433 active_connections: 0,
434 message_throughput: 0.0,
435 }
436 }
437}
438
439#[derive(Debug, Clone)]
440struct MetricValue {
441 value: f64,
442 timestamp: Instant,
443 tags: HashMap<String, String>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct PerformanceMetrics {
449 pub uptime: Duration,
450 pub total_requests: u64,
451 pub requests_per_second: f64,
452 pub average_response_time: Duration,
453 pub memory_usage: u64,
454 pub cpu_usage: f64,
455 pub active_connections: u32,
456 pub message_throughput: f64,
457}
458
459#[derive(Debug, thiserror::Error)]
461pub enum PerformanceError {
462 #[error("Connection pooling is disabled")]
463 PoolingDisabled,
464
465 #[error("Connection pool exhausted")]
466 PoolExhausted,
467
468 #[error("Message batching is disabled")]
469 BatchingDisabled,
470
471 #[error("Cache operation failed: {0}")]
472 CacheError(String),
473
474 #[error("Metrics collection failed: {0}")]
475 MetricsError(String),
476}
477
478pub struct PerformanceProfiler {
480 samples: HashMap<String, Vec<Duration>>,
481 active_spans: HashMap<String, Instant>,
482}
483
484impl PerformanceProfiler {
485 pub fn new() -> Self {
486 Self {
487 samples: HashMap::new(),
488 active_spans: HashMap::new(),
489 }
490 }
491
492 pub fn start_span(&mut self, name: &str) {
493 self.active_spans.insert(name.to_string(), Instant::now());
494 }
495
496 pub fn end_span(&mut self, name: &str) {
497 if let Some(start_time) = self.active_spans.remove(name) {
498 let duration = start_time.elapsed();
499 self.samples.entry(name.to_string()).or_insert_with(Vec::new).push(duration);
500 }
501 }
502
503 pub fn get_stats(&self, name: &str) -> Option<SpanStats> {
504 self.samples.get(name).map(|samples| {
505 let sum: Duration = samples.iter().sum();
506 let avg = sum / samples.len() as u32;
507 let min = *samples.iter().min().unwrap();
508 let max = *samples.iter().max().unwrap();
509
510 SpanStats {
511 count: samples.len(),
512 average: avg,
513 min,
514 max,
515 total: sum,
516 }
517 })
518 }
519}
520
521#[derive(Debug, Clone)]
522pub struct SpanStats {
523 pub count: usize,
524 pub average: Duration,
525 pub min: Duration,
526 pub max: Duration,
527 pub total: Duration,
528}
529
530impl Default for PerformanceProfiler {
531 fn default() -> Self {
532 Self::new()
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 #[tokio::test]
541 async fn test_connection_pool() {
542 let pool = ConnectionPool::new(2);
543
544 let conn1 = pool.get_connection("ws://localhost:8080").await.unwrap();
545 let conn2 = pool.get_connection("ws://localhost:8080").await.unwrap();
546
547 assert!(pool.get_connection("ws://localhost:8080").await.is_err());
549
550 pool.return_connection(conn1).await;
552
553 assert!(pool.get_connection("ws://localhost:8080").await.is_ok());
555 }
556
557 #[tokio::test]
558 async fn test_message_batcher() {
559 let batcher = MessageBatcher::new(3, Duration::from_millis(100));
560
561 batcher.add_message(b"message1".to_vec()).await.unwrap();
562 batcher.add_message(b"message2".to_vec()).await.unwrap();
563
564 assert_eq!(batcher.pending_count(), 2);
565
566 batcher.add_message(b"message3".to_vec()).await.unwrap(); assert_eq!(batcher.pending_count(), 0);
569 }
570
571 #[tokio::test]
572 async fn test_message_cache() {
573 let cache = MessageCache::new(2, Duration::from_secs(1));
574
575 cache.set("key1".to_string(), b"value1".to_vec()).await;
576 cache.set("key2".to_string(), b"value2".to_vec()).await;
577
578 assert_eq!(cache.get("key1").await, Some(b"value1".to_vec()));
579 assert_eq!(cache.get("key2").await, Some(b"value2".to_vec()));
580
581 cache.set("key3".to_string(), b"value3".to_vec()).await;
583
584 let stats = cache.stats().await;
585 assert_eq!(stats.size, 2);
586 }
587
588 #[test]
589 fn test_profiler() {
590 let mut profiler = PerformanceProfiler::new();
591
592 profiler.start_span("test_operation");
593 std::thread::sleep(Duration::from_millis(10));
594 profiler.end_span("test_operation");
595
596 let stats = profiler.get_stats("test_operation").unwrap();
597 assert_eq!(stats.count, 1);
598 assert!(stats.average >= Duration::from_millis(10));
599 }
600}