1use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use serde::{Serialize, Deserialize};
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone)]
14pub struct PerformanceConfig {
15 pub enable_connection_pooling: bool,
16 pub max_pool_size: usize,
17 pub enable_message_batching: bool,
18 pub batch_size: usize,
19 pub batch_timeout: Duration,
20 pub enable_caching: bool,
21 pub cache_size: usize,
22 pub cache_ttl: Duration,
23 pub enable_compression: bool,
24 pub compression_threshold: usize,
25 pub enable_metrics: bool,
26}
27
28impl Default for PerformanceConfig {
29 fn default() -> Self {
30 Self {
31 enable_connection_pooling: true,
32 max_pool_size: 10,
33 enable_message_batching: true,
34 batch_size: 100,
35 batch_timeout: Duration::from_millis(10),
36 enable_caching: true,
37 cache_size: 1000,
38 cache_ttl: Duration::from_secs(300),
39 enable_compression: true,
40 compression_threshold: 1024,
41 enable_metrics: true,
42 }
43 }
44}
45
46pub struct PerformanceManager {
48 config: PerformanceConfig,
49 connection_pool: Option<ConnectionPool>,
50 message_batcher: Option<MessageBatcher>,
51 cache: Option<MessageCache>,
52 metrics_collector: Option<MetricsCollector>,
53 memory_monitor: Option<MemoryMonitor>,
54 cpu_throttler: Option<CpuThrottler>,
55 network_optimizer: Option<NetworkOptimizer>,
56}
57
58impl PerformanceManager {
59 pub fn new(config: impl Into<PerformanceConfig>) -> Self {
60 Self::new_with_config(config.into())
61 }
62
63 pub fn new_with_memory_config(config: MemoryPressureConfig) -> Self {
64 let perf_config = PerformanceConfig {
65 enable_connection_pooling: true,
66 max_pool_size: 10,
67 enable_message_batching: true,
68 batch_size: 100,
69 batch_timeout: Duration::from_millis(10),
70 enable_caching: true,
71 cache_size: 1000,
72 cache_ttl: Duration::from_secs(300),
73 enable_compression: true,
74 compression_threshold: config.compression_threshold,
75 enable_metrics: true,
76 };
77 Self::new_with_config(perf_config)
78 }
79
80 fn new_with_config(config: PerformanceConfig) -> Self {
81 let connection_pool = if config.enable_connection_pooling {
82 Some(ConnectionPool::new_simple(config.max_pool_size))
83 } else {
84 None
85 };
86
87 let message_batcher = if config.enable_message_batching {
88 Some(MessageBatcher::new(config.batch_size, config.batch_timeout))
89 } else {
90 None
91 };
92
93 let cache = if config.enable_caching {
94 Some(MessageCache::new(config.cache_size, config.cache_ttl))
95 } else {
96 None
97 };
98
99 let metrics_collector = if config.enable_metrics {
100 Some(MetricsCollector::new())
101 } else {
102 None
103 };
104
105 let memory_monitor = Some(MemoryMonitor::new());
106 let cpu_throttler = Some(CpuThrottler::new());
107 let network_optimizer = Some(NetworkOptimizer::new());
108
109 Self {
110 config,
111 connection_pool,
112 message_batcher,
113 cache,
114 metrics_collector,
115 memory_monitor,
116 cpu_throttler,
117 network_optimizer,
118 }
119 }
120
121 pub async fn get_connection(&self, url: &str) -> Result<PooledConnection, PerformanceError> {
123 if let Some(pool) = &self.connection_pool {
124 pool.get_connection(url).await
125 } else {
126 Err(PerformanceError::PoolingDisabled)
127 }
128 }
129
130 pub async fn return_connection(&self, connection: PooledConnection) {
132 if let Some(pool) = &self.connection_pool {
133 pool.return_connection(connection).await;
134 }
135 }
136
137 pub async fn queue_message(&self, message: Vec<u8>) -> Result<(), PerformanceError> {
139 if let Some(batcher) = &self.message_batcher {
140 batcher.add_message(message).await
141 } else {
142 Err(PerformanceError::BatchingDisabled)
143 }
144 }
145
146 pub async fn flush_messages(&self) -> Result<Vec<Vec<u8>>, PerformanceError> {
148 if let Some(batcher) = &self.message_batcher {
149 Ok(batcher.flush_messages().await)
150 } else {
151 Ok(vec![])
152 }
153 }
154
155 pub async fn get_cached(&self, key: &str) -> Option<Vec<u8>> {
157 if let Some(cache) = &self.cache {
158 cache.get(key).await
159 } else {
160 None
161 }
162 }
163
164 pub async fn set_cached(&self, key: String, value: Vec<u8>) {
166 if let Some(cache) = &self.cache {
167 cache.set(key, value).await;
168 }
169 }
170
171 pub fn record_metric(&self, name: &str, value: f64, tags: Option<HashMap<String, String>>) {
173 if let Some(collector) = &self.metrics_collector {
174 collector.record_metric(name, value, tags);
175 }
176 }
177
178 pub fn get_metrics(&self) -> Option<PerformanceMetrics> {
180 self.metrics_collector.as_ref().map(|c| c.get_metrics())
181 }
182
183 pub fn should_compress(&self, message_size: usize) -> bool {
185 self.config.enable_compression && message_size >= self.config.compression_threshold
186 }
187
188 pub async fn cache_message(&mut self, data: PerformanceTestData) {
190 if let Some(cache) = &self.cache {
191 let key = format!("msg_{}", data.id);
192 cache.set(key, data.payload.clone()).await;
193 }
194
195 if let Some(monitor) = &self.memory_monitor {
197 monitor.add_memory(data.payload.len());
198 }
199 }
200
201 pub async fn get_memory_usage(&self) -> f64 {
203 if let Some(monitor) = &self.memory_monitor {
204 monitor.get_memory_usage().await
205 } else {
206 0.0
207 }
208 }
209
210 pub async fn is_memory_pressure_detected(&self) -> bool {
212 if let Some(monitor) = &self.memory_monitor {
213 monitor.is_pressure_detected().await
214 } else {
215 false
216 }
217 }
218
219 pub async fn set_cpu_threshold(&mut self, threshold: f64) {
221 if let Some(throttler) = &mut self.cpu_throttler {
222 throttler.set_threshold(threshold).await;
223 }
224 }
225
226 pub async fn schedule_cpu_task<F, T>(&self, task: F) -> tokio::task::JoinHandle<Result<T, PerformanceError>>
228 where
229 F: std::future::Future<Output = T> + Send + 'static,
230 T: Send + 'static,
231 {
232 if let Some(throttler) = &self.cpu_throttler {
233 throttler.schedule_task(task).await
234 } else {
235 tokio::spawn(async { Err(PerformanceError::MetricsError("CPU throttling disabled".to_string())) })
236 }
237 }
238
239 pub async fn get_cpu_usage(&self) -> f64 {
241 if let Some(throttler) = &self.cpu_throttler {
242 throttler.get_cpu_usage().await
243 } else {
244 0.0
245 }
246 }
247
248 pub async fn optimize_bandwidth(&self, data: &[u8]) -> Result<Vec<u8>, PerformanceError> {
250 if let Some(optimizer) = &self.network_optimizer {
251 optimizer.optimize(data).await
252 } else {
253 Ok(data.to_vec())
254 }
255 }
256}
257
258#[derive(Debug, Clone)]
260pub struct ConnectionPoolConfig {
261 pub max_connections: usize,
262 pub min_connections: usize,
263}
264
265pub struct ConnectionPool {
267 max_size: usize,
268 connections: Arc<RwLock<HashMap<String, VecDeque<PooledConnection>>>>,
269 total_connections: Arc<Mutex<usize>>,
270}
271
272impl ConnectionPool {
273 pub fn new_simple(max_size: usize) -> Self {
274 Self {
275 max_size,
276 connections: Arc::new(RwLock::new(HashMap::new())),
277 total_connections: Arc::new(Mutex::new(0)),
278 }
279 }
280
281 pub async fn new(config: ConnectionPoolConfig) -> Result<Self, PerformanceError> {
283 let mut pool = Self {
284 max_size: config.max_connections,
285 connections: Arc::new(RwLock::new(HashMap::new())),
286 total_connections: Arc::new(Mutex::new(0)),
287 };
288
289 for i in 0..config.min_connections {
291 let url = format!("ws://localhost:8080/{}", i);
292 let connection = PooledConnection::new(url);
293 pool.connections.write().await
294 .entry(connection.url.clone())
295 .or_insert_with(VecDeque::new)
296 .push_back(connection);
297 }
298 *pool.total_connections.lock().unwrap() = config.min_connections;
299
300 Ok(pool)
301 }
302
303 pub async fn get_connection(&self, url: &str) -> Result<PooledConnection, PerformanceError> {
304 let mut connections = self.connections.write().await;
305
306 if let Some(pool) = connections.get_mut(url) {
307 if let Some(connection) = pool.pop_front() {
308 return Ok(connection);
309 }
310 }
311
312 let total = *self.total_connections.lock().unwrap();
314 if total < self.max_size {
315 *self.total_connections.lock().unwrap() += 1;
316 Ok(PooledConnection::new(url.to_string()))
317 } else {
318 Err(PerformanceError::PoolExhausted)
319 }
320 }
321
322 pub async fn return_connection(&self, connection: PooledConnection) {
323 if connection.is_healthy() {
324 let mut connections = self.connections.write().await;
325 let pool = connections.entry(connection.url.clone()).or_insert_with(VecDeque::new);
326 pool.push_back(connection);
327 } else {
328 *self.total_connections.lock().unwrap() -= 1;
330 }
331 }
332
333 pub async fn cleanup_idle_connections(&self) {
334 let mut connections = self.connections.write().await;
335 let cutoff = Instant::now() - Duration::from_secs(300); for pool in connections.values_mut() {
338 let original_len = pool.len();
339 pool.retain(|conn| conn.last_used > cutoff);
340 let removed = original_len - pool.len();
341
342 if removed > 0 {
343 *self.total_connections.lock().unwrap() -= removed;
344 }
345 }
346 }
347
348 pub fn active_connections(&self) -> usize {
350 *self.total_connections.lock().unwrap()
351 }
352
353 pub fn max_connections(&self) -> usize {
355 self.max_size
356 }
357
358 pub async fn available_connections(&self) -> usize {
360 let connections = self.connections.read().await;
361 connections.values().map(|pool| pool.len()).sum()
362 }
363
364 pub async fn simulate_connection_failure(&self, count: usize) {
366 let mut connections = self.connections.write().await;
367 let mut removed = 0;
368
369 for pool in connections.values_mut() {
370 while removed < count && !pool.is_empty() {
371 pool.pop_front();
372 removed += 1;
373 }
374 if removed >= count {
375 break;
376 }
377 }
378
379 *self.total_connections.lock().unwrap() -= removed;
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct PooledConnection {
386 pub url: String,
387 pub created_at: Instant,
388 pub last_used: Instant,
389 pub request_count: u64,
390 pub is_connected: bool,
391}
392
393impl PooledConnection {
394 pub fn new(url: String) -> Self {
395 let now = Instant::now();
396 Self {
397 url,
398 created_at: now,
399 last_used: now,
400 request_count: 0,
401 is_connected: true,
402 }
403 }
404
405 pub fn is_healthy(&self) -> bool {
406 self.is_connected && self.last_used.elapsed() < Duration::from_secs(60)
407 }
408
409 pub fn mark_used(&mut self) {
410 self.last_used = Instant::now();
411 self.request_count += 1;
412 }
413}
414
415pub struct MessageBatcher {
417 batch_size: usize,
418 batch_timeout: Duration,
419 pending_messages: Arc<Mutex<VecDeque<Vec<u8>>>>,
420 last_flush: Arc<Mutex<Instant>>,
421}
422
423impl MessageBatcher {
424 pub fn new(batch_size: usize, batch_timeout: Duration) -> Self {
425 Self {
426 batch_size,
427 batch_timeout,
428 pending_messages: Arc::new(Mutex::new(VecDeque::new())),
429 last_flush: Arc::new(Mutex::new(Instant::now())),
430 }
431 }
432
433 pub async fn add_message(&self, message: Vec<u8>) -> Result<(), PerformanceError> {
434 let mut pending = self.pending_messages.lock().unwrap();
435 pending.push_back(message);
436
437 if pending.len() >= self.batch_size {
439 drop(pending);
440 self.flush_messages().await;
441 }
442
443 Ok(())
444 }
445
446 pub async fn flush_messages(&self) -> Vec<Vec<u8>> {
447 let mut pending = self.pending_messages.lock().unwrap();
448 let messages: Vec<_> = pending.drain(..).collect();
449 *self.last_flush.lock().unwrap() = Instant::now();
450 messages
451 }
452
453 pub fn should_flush(&self) -> bool {
454 let pending = self.pending_messages.lock().unwrap();
455 let last_flush = self.last_flush.lock().unwrap();
456
457 !pending.is_empty() &&
458 (pending.len() >= self.batch_size ||
459 last_flush.elapsed() >= self.batch_timeout)
460 }
461
462 pub fn pending_count(&self) -> usize {
463 self.pending_messages.lock().unwrap().len()
464 }
465}
466
467pub struct MessageCache {
469 cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
470 max_size: usize,
471 ttl: Duration,
472}
473
474impl MessageCache {
475 pub fn new(max_size: usize, ttl: Duration) -> Self {
476 Self {
477 cache: Arc::new(RwLock::new(HashMap::new())),
478 max_size,
479 ttl,
480 }
481 }
482
483 pub async fn get(&self, key: &str) -> Option<Vec<u8>> {
484 let cache = self.cache.read().await;
485
486 if let Some(entry) = cache.get(key) {
487 if entry.expires_at > Instant::now() {
488 Some(entry.value.clone())
489 } else {
490 None }
492 } else {
493 None
494 }
495 }
496
497 pub async fn set(&self, key: String, value: Vec<u8>) {
498 let mut cache = self.cache.write().await;
499
500 if cache.len() >= self.max_size {
502 self.evict_oldest(&mut cache);
503 }
504
505 cache.insert(key, CacheEntry {
506 value,
507 created_at: Instant::now(),
508 expires_at: Instant::now() + self.ttl,
509 access_count: 1,
510 });
511 }
512
513 fn evict_oldest(&self, cache: &mut HashMap<String, CacheEntry>) {
514 if let Some(oldest_key) = cache.iter()
515 .min_by_key(|(_, entry)| entry.created_at)
516 .map(|(key, _)| key.clone())
517 {
518 cache.remove(&oldest_key);
519 }
520 }
521
522 pub async fn cleanup_expired(&self) {
523 let mut cache = self.cache.write().await;
524 let now = Instant::now();
525
526 cache.retain(|_, entry| entry.expires_at > now);
527 }
528
529 pub async fn stats(&self) -> CacheStats {
530 let cache = self.cache.read().await;
531
532 CacheStats {
533 size: cache.len(),
534 capacity: self.max_size,
535 hit_ratio: 0.0, }
537 }
538}
539
540#[derive(Debug, Clone)]
541struct CacheEntry {
542 value: Vec<u8>,
543 created_at: Instant,
544 expires_at: Instant,
545 access_count: u64,
546}
547
548#[derive(Debug, Clone)]
549pub struct CacheStats {
550 pub size: usize,
551 pub capacity: usize,
552 pub hit_ratio: f64,
553}
554
555pub struct MetricsCollector {
557 metrics: Arc<RwLock<HashMap<String, MetricValue>>>,
558 start_time: Instant,
559}
560
561impl MetricsCollector {
562 pub fn new() -> Self {
563 Self {
564 metrics: Arc::new(RwLock::new(HashMap::new())),
565 start_time: Instant::now(),
566 }
567 }
568
569 pub fn record_metric(&self, name: &str, value: f64, tags: Option<HashMap<String, String>>) {
570 let metric = MetricValue {
571 value,
572 timestamp: Instant::now(),
573 tags: tags.unwrap_or_default(),
574 };
575
576 tokio::spawn({
577 let metrics = self.metrics.clone();
578 let name = name.to_string();
579 async move {
580 let mut metrics = metrics.write().await;
581 metrics.insert(name, metric);
582 }
583 });
584 }
585
586 pub fn get_metrics(&self) -> PerformanceMetrics {
587 PerformanceMetrics {
590 uptime: self.start_time.elapsed(),
591 total_requests: 0,
592 requests_per_second: 0.0,
593 average_response_time: Duration::from_millis(0),
594 memory_usage: 0,
595 cpu_usage: 0.0,
596 active_connections: 0,
597 message_throughput: 0.0,
598 }
599 }
600}
601
602#[derive(Debug, Clone)]
603struct MetricValue {
604 value: f64,
605 timestamp: Instant,
606 tags: HashMap<String, String>,
607}
608
609#[derive(Debug, Clone, Serialize, Deserialize)]
611pub struct PerformanceMetrics {
612 pub uptime: Duration,
613 pub total_requests: u64,
614 pub requests_per_second: f64,
615 pub average_response_time: Duration,
616 pub memory_usage: u64,
617 pub cpu_usage: f64,
618 pub active_connections: u32,
619 pub message_throughput: f64,
620}
621
622#[derive(Debug, thiserror::Error)]
624pub enum PerformanceError {
625 #[error("Connection pooling is disabled")]
626 PoolingDisabled,
627
628 #[error("Connection pool exhausted")]
629 PoolExhausted,
630
631 #[error("Message batching is disabled")]
632 BatchingDisabled,
633
634 #[error("Cache operation failed: {0}")]
635 CacheError(String),
636
637 #[error("Metrics collection failed: {0}")]
638 MetricsError(String),
639}
640
641pub struct PerformanceProfiler {
643 samples: HashMap<String, Vec<Duration>>,
644 active_spans: HashMap<String, Instant>,
645}
646
647impl PerformanceProfiler {
648 pub fn new() -> Self {
649 Self {
650 samples: HashMap::new(),
651 active_spans: HashMap::new(),
652 }
653 }
654
655 pub fn start_span(&mut self, name: &str) {
656 self.active_spans.insert(name.to_string(), Instant::now());
657 }
658
659 pub fn end_span(&mut self, name: &str) {
660 if let Some(start_time) = self.active_spans.remove(name) {
661 let duration = start_time.elapsed();
662 self.samples.entry(name.to_string()).or_insert_with(Vec::new).push(duration);
663 }
664 }
665
666 pub fn get_stats(&self, name: &str) -> Option<SpanStats> {
667 self.samples.get(name).map(|samples| {
668 let sum: Duration = samples.iter().sum();
669 let avg = sum / samples.len() as u32;
670 let min = *samples.iter().min().unwrap();
671 let max = *samples.iter().max().unwrap();
672
673 SpanStats {
674 count: samples.len(),
675 average: avg,
676 min,
677 max,
678 total: sum,
679 }
680 })
681 }
682}
683
684#[derive(Debug, Clone)]
685pub struct SpanStats {
686 pub count: usize,
687 pub average: Duration,
688 pub min: Duration,
689 pub max: Duration,
690 pub total: Duration,
691}
692
693impl Default for PerformanceProfiler {
694 fn default() -> Self {
695 Self::new()
696 }
697}
698
699#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
701pub struct PerformanceTestData {
702 pub id: u64,
703 pub payload: Vec<u8>,
704 pub timestamp: u64,
705 pub priority: MessagePriority,
706 pub size_category: SizeCategory,
707}
708
709#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
710pub enum MessagePriority {
711 Low,
712 Normal,
713 High,
714 Critical,
715}
716
717#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
718pub enum SizeCategory {
719 Small, Medium, Large, Huge, }
724
725#[derive(Debug, Clone)]
727pub struct MemoryPressureConfig {
728 pub max_memory_usage: usize,
729 pub gc_threshold: f64,
730 pub eviction_policy: EvictionPolicy,
731 pub compression_threshold: usize,
732}
733
734#[derive(Debug, Clone)]
735pub enum EvictionPolicy {
736 LRU,
737 LFU,
738 FIFO,
739}
740
741impl From<MemoryPressureConfig> for PerformanceConfig {
742 fn from(config: MemoryPressureConfig) -> Self {
743 Self {
744 enable_connection_pooling: true,
745 max_pool_size: 10,
746 enable_message_batching: true,
747 batch_size: 100,
748 batch_timeout: Duration::from_millis(10),
749 enable_caching: true,
750 cache_size: 1000,
751 cache_ttl: Duration::from_secs(300),
752 enable_compression: true,
753 compression_threshold: config.compression_threshold,
754 enable_metrics: true,
755 }
756 }
757}
758
759pub struct MemoryMonitor {
761 max_memory: usize,
762 current_usage: Arc<Mutex<usize>>,
763 pressure_threshold: f64,
764}
765
766impl MemoryMonitor {
767 pub fn new() -> Self {
768 Self {
769 max_memory: 100 * 1024 * 1024, current_usage: Arc::new(Mutex::new(0)),
771 pressure_threshold: 0.8, }
773 }
774
775 pub async fn get_memory_usage(&self) -> f64 {
776 let usage = *self.current_usage.lock().unwrap();
777 usage as f64 / self.max_memory as f64
778 }
779
780 pub async fn is_pressure_detected(&self) -> bool {
781 self.get_memory_usage().await > self.pressure_threshold
782 }
783
784 pub fn add_memory(&self, size: usize) {
785 let mut usage = self.current_usage.lock().unwrap();
786 *usage += size;
787 }
788
789 pub fn remove_memory(&self, size: usize) {
790 let mut usage = self.current_usage.lock().unwrap();
791 *usage = usage.saturating_sub(size);
792 }
793}
794
795pub struct CpuThrottler {
797 threshold: Arc<Mutex<f64>>,
798 current_usage: Arc<Mutex<f64>>,
799}
800
801impl CpuThrottler {
802 pub fn new() -> Self {
803 Self {
804 threshold: Arc::new(Mutex::new(0.8)), current_usage: Arc::new(Mutex::new(0.0)),
806 }
807 }
808
809 pub async fn set_threshold(&self, threshold: f64) {
810 *self.threshold.lock().unwrap() = threshold;
811 }
812
813 pub async fn get_cpu_usage(&self) -> f64 {
814 *self.current_usage.lock().unwrap()
815 }
816
817 pub async fn schedule_task<F, T>(&self, task: F) -> tokio::task::JoinHandle<Result<T, PerformanceError>>
818 where
819 F: std::future::Future<Output = T> + Send + 'static,
820 T: Send + 'static,
821 {
822 let threshold = *self.threshold.lock().unwrap();
823 let current_usage = self.current_usage.clone();
824
825 tokio::spawn(async move {
826 if *current_usage.lock().unwrap() > threshold {
828 tokio::time::sleep(Duration::from_millis(10)).await;
829 }
830
831 *current_usage.lock().unwrap() = 0.5; let result = task.await;
836
837 *current_usage.lock().unwrap() = 0.0;
839
840 Ok(result)
841 })
842 }
843}
844
845pub struct NetworkOptimizer {
847 compression_enabled: bool,
848 compression_threshold: usize,
849}
850
851impl NetworkOptimizer {
852 pub fn new() -> Self {
853 Self {
854 compression_enabled: true,
855 compression_threshold: 1024, }
857 }
858
859 pub async fn optimize(&self, data: &[u8]) -> Result<Vec<u8>, PerformanceError> {
860 if self.compression_enabled && data.len() >= self.compression_threshold {
861 Ok(data.to_vec()) } else {
864 Ok(data.to_vec())
865 }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::*;
872
873 #[tokio::test]
874 async fn test_connection_pool() {
875 let config = ConnectionPoolConfig {
876 max_connections: 2,
877 min_connections: 0,
878 };
879 let pool = ConnectionPool::new(config).await.unwrap();
880
881 let conn1 = pool.get_connection("ws://localhost:8080").await.unwrap();
882 let conn2 = pool.get_connection("ws://localhost:8080").await.unwrap();
883
884 assert!(pool.get_connection("ws://localhost:8080").await.is_err());
886
887 pool.return_connection(conn1).await;
889
890 assert!(pool.get_connection("ws://localhost:8080").await.is_ok());
892 }
893
894 #[tokio::test]
895 async fn test_message_batcher() {
896 let batcher = MessageBatcher::new(3, Duration::from_millis(100));
897
898 batcher.add_message(b"message1".to_vec()).await.unwrap();
899 batcher.add_message(b"message2".to_vec()).await.unwrap();
900
901 assert_eq!(batcher.pending_count(), 2);
902
903 batcher.add_message(b"message3".to_vec()).await.unwrap(); assert_eq!(batcher.pending_count(), 0);
906 }
907
908 #[tokio::test]
909 async fn test_message_cache() {
910 let cache = MessageCache::new(2, Duration::from_secs(1));
911
912 cache.set("key1".to_string(), b"value1".to_vec()).await;
913 cache.set("key2".to_string(), b"value2".to_vec()).await;
914
915 assert_eq!(cache.get("key1").await, Some(b"value1".to_vec()));
916 assert_eq!(cache.get("key2").await, Some(b"value2".to_vec()));
917
918 cache.set("key3".to_string(), b"value3".to_vec()).await;
920
921 let stats = cache.stats().await;
922 assert_eq!(stats.size, 2);
923 }
924
925 #[test]
926 fn test_profiler() {
927 let mut profiler = PerformanceProfiler::new();
928
929 profiler.start_span("test_operation");
930 std::thread::sleep(Duration::from_millis(10));
931 profiler.end_span("test_operation");
932
933 let stats = profiler.get_stats("test_operation").unwrap();
934 assert_eq!(stats.count, 1);
935 assert!(stats.average >= Duration::from_millis(10));
936 }
937}