leptos_ws_pro/
performance.rs

1//! Performance Optimization Module
2//!
3//! High-performance features including connection pooling, message batching,
4//! caching, and performance monitoring
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use serde::{Serialize, Deserialize};
10use tokio::sync::RwLock;
11
12/// Performance configuration
13#[derive(Debug, Clone)]
14pub struct PerformanceConfig {
15    pub enable_connection_pooling: bool,
16    pub max_pool_size: usize,
17    pub enable_message_batching: bool,
18    pub batch_size: usize,
19    pub batch_timeout: Duration,
20    pub enable_caching: bool,
21    pub cache_size: usize,
22    pub cache_ttl: Duration,
23    pub enable_compression: bool,
24    pub compression_threshold: usize,
25    pub enable_metrics: bool,
26}
27
28impl Default for PerformanceConfig {
29    fn default() -> Self {
30        Self {
31            enable_connection_pooling: true,
32            max_pool_size: 10,
33            enable_message_batching: true,
34            batch_size: 100,
35            batch_timeout: Duration::from_millis(10),
36            enable_caching: true,
37            cache_size: 1000,
38            cache_ttl: Duration::from_secs(300),
39            enable_compression: true,
40            compression_threshold: 1024,
41            enable_metrics: true,
42        }
43    }
44}
45
46/// Performance manager coordinating all optimizations
47pub struct PerformanceManager {
48    config: PerformanceConfig,
49    connection_pool: Option<ConnectionPool>,
50    message_batcher: Option<MessageBatcher>,
51    cache: Option<MessageCache>,
52    metrics_collector: Option<MetricsCollector>,
53}
54
55impl PerformanceManager {
56    pub fn new(config: PerformanceConfig) -> Self {
57        let connection_pool = if config.enable_connection_pooling {
58            Some(ConnectionPool::new(config.max_pool_size))
59        } else {
60            None
61        };
62
63        let message_batcher = if config.enable_message_batching {
64            Some(MessageBatcher::new(config.batch_size, config.batch_timeout))
65        } else {
66            None
67        };
68
69        let cache = if config.enable_caching {
70            Some(MessageCache::new(config.cache_size, config.cache_ttl))
71        } else {
72            None
73        };
74
75        let metrics_collector = if config.enable_metrics {
76            Some(MetricsCollector::new())
77        } else {
78            None
79        };
80
81        Self {
82            config,
83            connection_pool,
84            message_batcher,
85            cache,
86            metrics_collector,
87        }
88    }
89
90    /// Get or create a connection from the pool
91    pub async fn get_connection(&self, url: &str) -> Result<PooledConnection, PerformanceError> {
92        if let Some(pool) = &self.connection_pool {
93            pool.get_connection(url).await
94        } else {
95            Err(PerformanceError::PoolingDisabled)
96        }
97    }
98
99    /// Return connection to pool
100    pub async fn return_connection(&self, connection: PooledConnection) {
101        if let Some(pool) = &self.connection_pool {
102            pool.return_connection(connection).await;
103        }
104    }
105
106    /// Add message to batch queue
107    pub async fn queue_message(&self, message: Vec<u8>) -> Result<(), PerformanceError> {
108        if let Some(batcher) = &self.message_batcher {
109            batcher.add_message(message).await
110        } else {
111            Err(PerformanceError::BatchingDisabled)
112        }
113    }
114
115    /// Flush pending batched messages
116    pub async fn flush_messages(&self) -> Result<Vec<Vec<u8>>, PerformanceError> {
117        if let Some(batcher) = &self.message_batcher {
118            Ok(batcher.flush_messages().await)
119        } else {
120            Ok(vec![])
121        }
122    }
123
124    /// Get cached message
125    pub async fn get_cached(&self, key: &str) -> Option<Vec<u8>> {
126        if let Some(cache) = &self.cache {
127            cache.get(key).await
128        } else {
129            None
130        }
131    }
132
133    /// Set cached message
134    pub async fn set_cached(&self, key: String, value: Vec<u8>) {
135        if let Some(cache) = &self.cache {
136            cache.set(key, value).await;
137        }
138    }
139
140    /// Record performance metric
141    pub fn record_metric(&self, name: &str, value: f64, tags: Option<HashMap<String, String>>) {
142        if let Some(collector) = &self.metrics_collector {
143            collector.record_metric(name, value, tags);
144        }
145    }
146
147    /// Get current performance metrics
148    pub fn get_metrics(&self) -> Option<PerformanceMetrics> {
149        self.metrics_collector.as_ref().map(|c| c.get_metrics())
150    }
151
152    /// Check if compression should be used for message
153    pub fn should_compress(&self, message_size: usize) -> bool {
154        self.config.enable_compression && message_size >= self.config.compression_threshold
155    }
156}
157
158/// Connection pool for reusing WebSocket connections
159pub struct ConnectionPool {
160    max_size: usize,
161    connections: Arc<RwLock<HashMap<String, VecDeque<PooledConnection>>>>,
162    total_connections: Arc<Mutex<usize>>,
163}
164
165impl ConnectionPool {
166    pub fn new(max_size: usize) -> Self {
167        Self {
168            max_size,
169            connections: Arc::new(RwLock::new(HashMap::new())),
170            total_connections: Arc::new(Mutex::new(0)),
171        }
172    }
173
174    pub async fn get_connection(&self, url: &str) -> Result<PooledConnection, PerformanceError> {
175        let mut connections = self.connections.write().await;
176
177        if let Some(pool) = connections.get_mut(url) {
178            if let Some(connection) = pool.pop_front() {
179                return Ok(connection);
180            }
181        }
182
183        // No available connection, create new one if under limit
184        let total = *self.total_connections.lock().unwrap();
185        if total < self.max_size {
186            *self.total_connections.lock().unwrap() += 1;
187            Ok(PooledConnection::new(url.to_string()))
188        } else {
189            Err(PerformanceError::PoolExhausted)
190        }
191    }
192
193    pub async fn return_connection(&self, connection: PooledConnection) {
194        if connection.is_healthy() {
195            let mut connections = self.connections.write().await;
196            let pool = connections.entry(connection.url.clone()).or_insert_with(VecDeque::new);
197            pool.push_back(connection);
198        } else {
199            // Unhealthy connection, don't return to pool
200            *self.total_connections.lock().unwrap() -= 1;
201        }
202    }
203
204    pub async fn cleanup_idle_connections(&self) {
205        let mut connections = self.connections.write().await;
206        let cutoff = Instant::now() - Duration::from_secs(300); // 5 minutes
207
208        for pool in connections.values_mut() {
209            let original_len = pool.len();
210            pool.retain(|conn| conn.last_used > cutoff);
211            let removed = original_len - pool.len();
212
213            if removed > 0 {
214                *self.total_connections.lock().unwrap() -= removed;
215            }
216        }
217    }
218}
219
220/// Pooled connection wrapper
221#[derive(Debug, Clone)]
222pub struct PooledConnection {
223    pub url: String,
224    pub created_at: Instant,
225    pub last_used: Instant,
226    pub request_count: u64,
227    pub is_connected: bool,
228}
229
230impl PooledConnection {
231    pub fn new(url: String) -> Self {
232        let now = Instant::now();
233        Self {
234            url,
235            created_at: now,
236            last_used: now,
237            request_count: 0,
238            is_connected: true,
239        }
240    }
241
242    pub fn is_healthy(&self) -> bool {
243        self.is_connected && self.last_used.elapsed() < Duration::from_secs(60)
244    }
245
246    pub fn mark_used(&mut self) {
247        self.last_used = Instant::now();
248        self.request_count += 1;
249    }
250}
251
252/// Message batcher for improving throughput
253pub struct MessageBatcher {
254    batch_size: usize,
255    batch_timeout: Duration,
256    pending_messages: Arc<Mutex<VecDeque<Vec<u8>>>>,
257    last_flush: Arc<Mutex<Instant>>,
258}
259
260impl MessageBatcher {
261    pub fn new(batch_size: usize, batch_timeout: Duration) -> Self {
262        Self {
263            batch_size,
264            batch_timeout,
265            pending_messages: Arc::new(Mutex::new(VecDeque::new())),
266            last_flush: Arc::new(Mutex::new(Instant::now())),
267        }
268    }
269
270    pub async fn add_message(&self, message: Vec<u8>) -> Result<(), PerformanceError> {
271        let mut pending = self.pending_messages.lock().unwrap();
272        pending.push_back(message);
273
274        // Auto-flush if batch is full
275        if pending.len() >= self.batch_size {
276            drop(pending);
277            self.flush_messages().await;
278        }
279
280        Ok(())
281    }
282
283    pub async fn flush_messages(&self) -> Vec<Vec<u8>> {
284        let mut pending = self.pending_messages.lock().unwrap();
285        let messages: Vec<_> = pending.drain(..).collect();
286        *self.last_flush.lock().unwrap() = Instant::now();
287        messages
288    }
289
290    pub fn should_flush(&self) -> bool {
291        let pending = self.pending_messages.lock().unwrap();
292        let last_flush = self.last_flush.lock().unwrap();
293
294        !pending.is_empty() &&
295        (pending.len() >= self.batch_size ||
296         last_flush.elapsed() >= self.batch_timeout)
297    }
298
299    pub fn pending_count(&self) -> usize {
300        self.pending_messages.lock().unwrap().len()
301    }
302}
303
304/// High-performance message cache
305pub struct MessageCache {
306    cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
307    max_size: usize,
308    ttl: Duration,
309}
310
311impl MessageCache {
312    pub fn new(max_size: usize, ttl: Duration) -> Self {
313        Self {
314            cache: Arc::new(RwLock::new(HashMap::new())),
315            max_size,
316            ttl,
317        }
318    }
319
320    pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
321        let cache = self.cache.read().await;
322
323        if let Some(entry) = cache.get(key) {
324            if entry.expires_at > Instant::now() {
325                Some(entry.value.clone())
326            } else {
327                None // Expired
328            }
329        } else {
330            None
331        }
332    }
333
334    pub async fn set(&self, key: String, value: Vec<u8>) {
335        let mut cache = self.cache.write().await;
336
337        // Evict oldest entries if at capacity
338        if cache.len() >= self.max_size {
339            self.evict_oldest(&mut cache);
340        }
341
342        cache.insert(key, CacheEntry {
343            value,
344            created_at: Instant::now(),
345            expires_at: Instant::now() + self.ttl,
346            access_count: 1,
347        });
348    }
349
350    fn evict_oldest(&self, cache: &mut HashMap<String, CacheEntry>) {
351        if let Some(oldest_key) = cache.iter()
352            .min_by_key(|(_, entry)| entry.created_at)
353            .map(|(key, _)| key.clone())
354        {
355            cache.remove(&oldest_key);
356        }
357    }
358
359    pub async fn cleanup_expired(&self) {
360        let mut cache = self.cache.write().await;
361        let now = Instant::now();
362
363        cache.retain(|_, entry| entry.expires_at > now);
364    }
365
366    pub async fn stats(&self) -> CacheStats {
367        let cache = self.cache.read().await;
368
369        CacheStats {
370            size: cache.len(),
371            capacity: self.max_size,
372            hit_ratio: 0.0, // Would need hit/miss tracking
373        }
374    }
375}
376
377#[derive(Debug, Clone)]
378struct CacheEntry {
379    value: Vec<u8>,
380    created_at: Instant,
381    expires_at: Instant,
382    access_count: u64,
383}
384
385#[derive(Debug, Clone)]
386pub struct CacheStats {
387    pub size: usize,
388    pub capacity: usize,
389    pub hit_ratio: f64,
390}
391
392/// Performance metrics collector
393pub struct MetricsCollector {
394    metrics: Arc<RwLock<HashMap<String, MetricValue>>>,
395    start_time: Instant,
396}
397
398impl MetricsCollector {
399    pub fn new() -> Self {
400        Self {
401            metrics: Arc::new(RwLock::new(HashMap::new())),
402            start_time: Instant::now(),
403        }
404    }
405
406    pub fn record_metric(&self, name: &str, value: f64, tags: Option<HashMap<String, String>>) {
407        let metric = MetricValue {
408            value,
409            timestamp: Instant::now(),
410            tags: tags.unwrap_or_default(),
411        };
412
413        tokio::spawn({
414            let metrics = self.metrics.clone();
415            let name = name.to_string();
416            async move {
417                let mut metrics = metrics.write().await;
418                metrics.insert(name, metric);
419            }
420        });
421    }
422
423    pub fn get_metrics(&self) -> PerformanceMetrics {
424        // In async context, we'd need to handle this differently
425        // For now, return basic metrics
426        PerformanceMetrics {
427            uptime: self.start_time.elapsed(),
428            total_requests: 0,
429            requests_per_second: 0.0,
430            average_response_time: Duration::from_millis(0),
431            memory_usage: 0,
432            cpu_usage: 0.0,
433            active_connections: 0,
434            message_throughput: 0.0,
435        }
436    }
437}
438
439#[derive(Debug, Clone)]
440struct MetricValue {
441    value: f64,
442    timestamp: Instant,
443    tags: HashMap<String, String>,
444}
445
446/// Performance metrics snapshot
447#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct PerformanceMetrics {
449    pub uptime: Duration,
450    pub total_requests: u64,
451    pub requests_per_second: f64,
452    pub average_response_time: Duration,
453    pub memory_usage: u64,
454    pub cpu_usage: f64,
455    pub active_connections: u32,
456    pub message_throughput: f64,
457}
458
459/// Performance-related errors
460#[derive(Debug, thiserror::Error)]
461pub enum PerformanceError {
462    #[error("Connection pooling is disabled")]
463    PoolingDisabled,
464
465    #[error("Connection pool exhausted")]
466    PoolExhausted,
467
468    #[error("Message batching is disabled")]
469    BatchingDisabled,
470
471    #[error("Cache operation failed: {0}")]
472    CacheError(String),
473
474    #[error("Metrics collection failed: {0}")]
475    MetricsError(String),
476}
477
478/// Performance profiler for hot path optimization
479pub struct PerformanceProfiler {
480    samples: HashMap<String, Vec<Duration>>,
481    active_spans: HashMap<String, Instant>,
482}
483
484impl PerformanceProfiler {
485    pub fn new() -> Self {
486        Self {
487            samples: HashMap::new(),
488            active_spans: HashMap::new(),
489        }
490    }
491
492    pub fn start_span(&mut self, name: &str) {
493        self.active_spans.insert(name.to_string(), Instant::now());
494    }
495
496    pub fn end_span(&mut self, name: &str) {
497        if let Some(start_time) = self.active_spans.remove(name) {
498            let duration = start_time.elapsed();
499            self.samples.entry(name.to_string()).or_insert_with(Vec::new).push(duration);
500        }
501    }
502
503    pub fn get_stats(&self, name: &str) -> Option<SpanStats> {
504        self.samples.get(name).map(|samples| {
505            let sum: Duration = samples.iter().sum();
506            let avg = sum / samples.len() as u32;
507            let min = *samples.iter().min().unwrap();
508            let max = *samples.iter().max().unwrap();
509
510            SpanStats {
511                count: samples.len(),
512                average: avg,
513                min,
514                max,
515                total: sum,
516            }
517        })
518    }
519}
520
521#[derive(Debug, Clone)]
522pub struct SpanStats {
523    pub count: usize,
524    pub average: Duration,
525    pub min: Duration,
526    pub max: Duration,
527    pub total: Duration,
528}
529
530impl Default for PerformanceProfiler {
531    fn default() -> Self {
532        Self::new()
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[tokio::test]
541    async fn test_connection_pool() {
542        let pool = ConnectionPool::new(2);
543
544        let conn1 = pool.get_connection("ws://localhost:8080").await.unwrap();
545        let conn2 = pool.get_connection("ws://localhost:8080").await.unwrap();
546
547        // Pool should be exhausted
548        assert!(pool.get_connection("ws://localhost:8080").await.is_err());
549
550        // Return connection
551        pool.return_connection(conn1).await;
552
553        // Should be able to get connection again
554        assert!(pool.get_connection("ws://localhost:8080").await.is_ok());
555    }
556
557    #[tokio::test]
558    async fn test_message_batcher() {
559        let batcher = MessageBatcher::new(3, Duration::from_millis(100));
560
561        batcher.add_message(b"message1".to_vec()).await.unwrap();
562        batcher.add_message(b"message2".to_vec()).await.unwrap();
563
564        assert_eq!(batcher.pending_count(), 2);
565
566        batcher.add_message(b"message3".to_vec()).await.unwrap(); // Should auto-flush
567
568        assert_eq!(batcher.pending_count(), 0);
569    }
570
571    #[tokio::test]
572    async fn test_message_cache() {
573        let cache = MessageCache::new(2, Duration::from_secs(1));
574
575        cache.set("key1".to_string(), b"value1".to_vec()).await;
576        cache.set("key2".to_string(), b"value2".to_vec()).await;
577
578        assert_eq!(cache.get("key1").await, Some(b"value1".to_vec()));
579        assert_eq!(cache.get("key2").await, Some(b"value2".to_vec()));
580
581        // Should evict oldest when at capacity
582        cache.set("key3".to_string(), b"value3".to_vec()).await;
583
584        let stats = cache.stats().await;
585        assert_eq!(stats.size, 2);
586    }
587
588    #[test]
589    fn test_profiler() {
590        let mut profiler = PerformanceProfiler::new();
591
592        profiler.start_span("test_operation");
593        std::thread::sleep(Duration::from_millis(10));
594        profiler.end_span("test_operation");
595
596        let stats = profiler.get_stats("test_operation").unwrap();
597        assert_eq!(stats.count, 1);
598        assert!(stats.average >= Duration::from_millis(10));
599    }
600}