1#![allow(missing_docs)]
24
25use super::*;
26use bytes::{Bytes, BytesMut};
27use parking_lot::RwLock as PLRwLock;
28use std::{
29 collections::HashMap,
30 sync::Arc,
31 time::{Duration, Instant},
32};
33use tokio::sync::{Semaphore, mpsc};
34
35#[derive(Debug, Clone)]
37pub struct PerformanceConfig {
38 pub max_concurrent_ops: usize,
40
41 pub connection_pool_size: usize,
43
44 pub cache_config: CacheConfig,
46
47 pub serialization: SerializationConfig,
49
50 pub batch_config: BatchConfig,
52}
53
54impl Default for PerformanceConfig {
55 fn default() -> Self {
56 Self {
57 max_concurrent_ops: 1000,
58 connection_pool_size: 100,
59 cache_config: CacheConfig::default(),
60 serialization: SerializationConfig::default(),
61 batch_config: BatchConfig::default(),
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct CacheConfig {
69 pub max_entries: usize,
71
72 pub ttl: Duration,
74
75 pub compression: bool,
77}
78
79impl Default for CacheConfig {
80 fn default() -> Self {
81 Self {
82 max_entries: 10_000,
83 ttl: Duration::from_secs(300),
84 compression: true,
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct SerializationConfig {
92 pub zero_copy: bool,
94
95 pub buffer_size: usize,
97
98 pub compression: bool,
100}
101
102impl Default for SerializationConfig {
103 fn default() -> Self {
104 Self {
105 zero_copy: true,
106 buffer_size: 4096,
107 compression: false,
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct BatchConfig {
115 pub max_batch_size: usize,
117
118 pub batch_timeout: Duration,
120
121 pub auto_batch: bool,
123}
124
125impl Default for BatchConfig {
126 fn default() -> Self {
127 Self {
128 max_batch_size: 100,
129 batch_timeout: Duration::from_millis(10),
130 auto_batch: true,
131 }
132 }
133}
134
135pub struct ZeroCopyMessage {
137 data: Bytes,
138}
139
140impl ZeroCopyMessage {
141 pub fn new(data: Bytes) -> Self {
143 Self { data }
144 }
145
146 pub fn as_bytes(&self) -> &[u8] {
148 &self.data
149 }
150
151 pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
153 bincode::deserialize(&self.data).map_err(AdaptiveNetworkError::Serialization)
154 }
155}
156
157pub struct OptimizedSerializer {
159 buffer_pool: Arc<PLRwLock<Vec<BytesMut>>>,
160 config: SerializationConfig,
161}
162
163impl OptimizedSerializer {
164 pub fn new(config: SerializationConfig) -> Self {
165 Self {
166 buffer_pool: Arc::new(PLRwLock::new(Vec::new())),
167 config,
168 }
169 }
170
171 pub fn serialize<T: serde::Serialize>(&self, value: &T) -> Result<Bytes> {
173 let mut buffer = self
175 .buffer_pool
176 .write()
177 .pop()
178 .unwrap_or_else(|| BytesMut::with_capacity(self.config.buffer_size));
179
180 buffer.clear();
181
182 let serialized = bincode::serialize(value).map_err(AdaptiveNetworkError::Serialization)?;
184 buffer.extend_from_slice(&serialized);
185
186 let bytes = if self.config.compression {
188 let compressed = self.compress(&buffer)?;
189 if buffer.capacity() <= self.config.buffer_size * 2 {
191 self.buffer_pool.write().push(buffer);
192 }
193 compressed
194 } else {
195 buffer.freeze()
197 };
198
199 Ok(bytes)
200 }
201
202 fn compress(&self, data: &[u8]) -> Result<Bytes> {
203 use flate2::Compression;
204 use flate2::write::GzEncoder;
205 use std::io::Write;
206
207 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
208 encoder.write_all(data)?;
209 Ok(Bytes::from(encoder.finish()?))
210 }
211}
212
213pub struct ConnectionPool<T> {
215 connections: Arc<PLRwLock<HashMap<String, Vec<T>>>>,
216 semaphore: Arc<Semaphore>,
217 max_per_host: usize,
218}
219
220impl<T: Send> ConnectionPool<T> {
221 pub fn new(max_connections: usize, max_per_host: usize) -> Self {
222 Self {
223 connections: Arc::new(PLRwLock::new(HashMap::new())),
224 semaphore: Arc::new(Semaphore::new(max_connections)),
225 max_per_host,
226 }
227 }
228
229 pub async fn get(&self, host: &str) -> Option<T> {
231 let _permit = self.semaphore.acquire().await.ok()?;
232 self.connections
233 .write()
234 .get_mut(host)
235 .and_then(|conns| conns.pop())
236 }
237
238 pub fn put(&self, host: String, conn: T) {
240 let mut pool = self.connections.write();
241 let conns = pool.entry(host).or_default();
242
243 if conns.len() < self.max_per_host {
244 conns.push(conn);
245 }
246 }
247}
248
249pub struct PerformanceCache<K, V> {
251 entries: Arc<PLRwLock<HashMap<K, CacheEntry<V>>>>,
252 config: CacheConfig,
253}
254
255#[derive(Clone)]
256struct CacheEntry<V> {
257 value: V,
258 inserted_at: Instant,
259}
260
261impl<K: Eq + std::hash::Hash + Clone, V: Clone> PerformanceCache<K, V> {
262 pub fn new(config: CacheConfig) -> Self {
263 Self {
264 entries: Arc::new(PLRwLock::new(HashMap::new())),
265 config,
266 }
267 }
268
269 pub fn get(&self, key: &K) -> Option<V> {
271 let entries = self.entries.read();
272 entries.get(key).and_then(|entry| {
273 if entry.inserted_at.elapsed() < self.config.ttl {
274 Some(entry.value.clone())
275 } else {
276 None
277 }
278 })
279 }
280
281 pub fn insert(&self, key: K, value: V) {
283 let mut entries = self.entries.write();
284
285 if entries.len() >= self.config.max_entries {
287 let _now = Instant::now();
288 entries.retain(|_, entry| entry.inserted_at.elapsed() < self.config.ttl);
289
290 if entries.len() >= self.config.max_entries
292 && let Some(oldest_key) = entries
293 .iter()
294 .min_by_key(|(_, entry)| entry.inserted_at)
295 .map(|(k, _)| k.clone())
296 {
297 entries.remove(&oldest_key);
298 }
299 }
300
301 entries.insert(
302 key,
303 CacheEntry {
304 value,
305 inserted_at: Instant::now(),
306 },
307 );
308 }
309
310 pub fn evict_expired(&self) {
312 let mut entries = self.entries.write();
313 let _now = Instant::now();
314 entries.retain(|_, entry| entry.inserted_at.elapsed() < self.config.ttl);
315 }
316}
317
318pub struct BatchProcessor<T> {
320 config: BatchConfig,
321 tx: mpsc::Sender<T>,
322 rx: Arc<tokio::sync::Mutex<mpsc::Receiver<T>>>,
323}
324
325impl<T: Send + 'static> BatchProcessor<T> {
326 pub fn new(config: BatchConfig) -> Self {
327 let (tx, rx) = mpsc::channel(config.max_batch_size * 10);
328 Self {
329 config,
330 tx,
331 rx: Arc::new(tokio::sync::Mutex::new(rx)),
332 }
333 }
334
335 pub async fn add(&self, item: T) -> Result<()> {
337 self.tx
338 .send(item)
339 .await
340 .map_err(|_| AdaptiveNetworkError::Other("Batch processor closed".to_string()))?;
341 Ok(())
342 }
343
344 pub async fn process_batch<F, Fut>(&self, mut f: F) -> Result<()>
346 where
347 F: FnMut(Vec<T>) -> Fut,
348 Fut: std::future::Future<Output = Result<()>>,
349 {
350 let mut rx = self.rx.lock().await;
351 let mut batch = Vec::with_capacity(self.config.max_batch_size);
352
353 let timeout = tokio::time::sleep(self.config.batch_timeout);
355 tokio::pin!(timeout);
356
357 loop {
358 tokio::select! {
359 Some(item) = rx.recv() => {
360 batch.push(item);
361 if batch.len() >= self.config.max_batch_size {
362 break;
363 }
364 }
365 _ = &mut timeout => {
366 if !batch.is_empty() {
367 break;
368 }
369 }
370 }
371 }
372
373 if !batch.is_empty() {
374 f(batch).await?;
375 }
376
377 Ok(())
378 }
379}
380
381#[derive(Clone)]
383pub struct ConcurrencyLimiter {
384 semaphore: Arc<Semaphore>,
385}
386
387impl ConcurrencyLimiter {
388 pub fn new(max_concurrent: usize) -> Self {
389 Self {
390 semaphore: Arc::new(Semaphore::new(max_concurrent)),
391 }
392 }
393
394 pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
396 where
397 F: FnOnce() -> Fut,
398 Fut: std::future::Future<Output = Result<T>>,
399 {
400 let _permit = self
401 .semaphore
402 .acquire()
403 .await
404 .map_err(|_| AdaptiveNetworkError::Other("Semaphore closed".to_string()))?;
405 f().await
406 }
407
408 pub async fn execute_many<F, Fut, T>(&self, operations: Vec<F>) -> Vec<Result<T>>
410 where
411 F: FnOnce() -> Fut + Send,
412 Fut: std::future::Future<Output = Result<T>> + Send,
413 T: Send,
414 {
415 let futures = operations.into_iter().map(|op| {
416 let semaphore = self.semaphore.clone();
417 async move {
418 let _permit = semaphore.acquire().await.ok()?;
419 Some(op().await)
420 }
421 });
422
423 futures::future::join_all(futures)
424 .await
425 .into_iter()
426 .flatten()
427 .collect()
428 }
429}
430
431#[derive(Debug)]
433pub struct PerformanceMonitor {
434 operation_times: Arc<PLRwLock<HashMap<String, Vec<Duration>>>>,
435 start_times: Arc<PLRwLock<HashMap<String, Instant>>>,
436}
437
438impl Default for PerformanceMonitor {
439 fn default() -> Self {
440 Self::new()
441 }
442}
443
444impl PerformanceMonitor {
445 pub fn new() -> Self {
446 Self {
447 operation_times: Arc::new(PLRwLock::new(HashMap::new())),
448 start_times: Arc::new(PLRwLock::new(HashMap::new())),
449 }
450 }
451
452 pub fn start_operation(&self, name: &str) {
454 self.start_times
455 .write()
456 .insert(name.to_string(), Instant::now());
457 }
458
459 pub fn end_operation(&self, name: &str) {
461 if let Some(start) = self.start_times.write().remove(name) {
462 let duration = start.elapsed();
463 self.operation_times
464 .write()
465 .entry(name.to_string())
466 .or_default()
467 .push(duration);
468 }
469 }
470
471 pub fn get_stats(&self, name: &str) -> Option<PerformanceStats> {
473 let times = self.operation_times.read();
474 times.get(name).map(|durations| {
475 let total: Duration = durations.iter().sum();
476 let count = durations.len();
477 let avg = total / count as u32;
478
479 let mut sorted = durations.clone();
480 sorted.sort();
481
482 PerformanceStats {
483 count,
484 avg_duration: avg,
485 min_duration: sorted.first().copied().unwrap_or_default(),
486 max_duration: sorted.last().copied().unwrap_or_default(),
487 p50_duration: sorted.get(count / 2).copied().unwrap_or_default(),
488 p99_duration: sorted.get(count * 99 / 100).copied().unwrap_or_default(),
489 }
490 })
491 }
492}
493
494#[derive(Debug, Clone)]
495pub struct PerformanceStats {
496 pub count: usize,
497 pub avg_duration: Duration,
498 pub min_duration: Duration,
499 pub max_duration: Duration,
500 pub p50_duration: Duration,
501 pub p99_duration: Duration,
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507
508 #[test]
509 fn test_zero_copy_message() {
510 let data = Bytes::from(vec![1, 2, 3, 4]);
511 let msg = ZeroCopyMessage::new(data);
512 assert_eq!(msg.as_bytes(), &[1, 2, 3, 4]);
513 }
514
515 #[tokio::test]
516 async fn test_batch_processor() {
517 let processor = BatchProcessor::new(BatchConfig {
518 max_batch_size: 3,
519 batch_timeout: Duration::from_millis(100),
520 auto_batch: true,
521 });
522
523 processor.add(1).await.unwrap();
525 processor.add(2).await.unwrap();
526 processor.add(3).await.unwrap();
527
528 processor
530 .process_batch(|batch| async move {
531 assert_eq!(batch.len(), 3);
532 assert_eq!(batch, vec![1, 2, 3]);
533 Ok::<_, AdaptiveNetworkError>(())
534 })
535 .await
536 .unwrap();
537 }
538
539 #[tokio::test]
540 async fn test_concurrency_limiter() {
541 let limiter = ConcurrencyLimiter::new(2);
542 let counter = Arc::new(tokio::sync::Mutex::new(0));
543
544 let operations: Vec<_> = (0..5)
545 .map(|_| {
546 let counter = counter.clone();
547 let limiter = limiter.clone();
548
549 tokio::spawn(async move {
550 limiter
551 .execute(|| async {
552 let mut count = counter.lock().await;
553 *count += 1;
554 Ok::<_, AdaptiveNetworkError>(())
555 })
556 .await
557 })
558 })
559 .collect();
560
561 for op in operations {
562 op.await.unwrap().unwrap();
563 }
564
565 assert_eq!(*counter.lock().await, 5);
566 }
567
568 #[test]
569 fn test_performance_cache() {
570 let cache = PerformanceCache::new(CacheConfig {
571 max_entries: 2,
572 ttl: Duration::from_secs(1),
573 compression: false,
574 });
575
576 cache.insert("key1", "value1");
577 cache.insert("key2", "value2");
578
579 assert_eq!(cache.get(&"key1"), Some("value1"));
580 assert_eq!(cache.get(&"key2"), Some("value2"));
581
582 cache.insert("key3", "value3");
584 assert_eq!(cache.get(&"key3"), Some("value3"));
585 }
586
587 #[test]
588 fn test_performance_monitor() {
589 let monitor = PerformanceMonitor::new();
590
591 monitor.start_operation("test_op");
592 std::thread::sleep(Duration::from_millis(10));
593 monitor.end_operation("test_op");
594
595 let stats = monitor.get_stats("test_op").unwrap();
596 assert_eq!(stats.count, 1);
597 assert!(stats.avg_duration >= Duration::from_millis(10));
598 }
599}