saorsa_core/adaptive/
performance.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Performance optimization utilities for the Adaptive P2P Network
15//!
16//! This module provides tools and optimizations for improving network performance:
17//! - Zero-copy message handling
18//! - Optimized serialization
19//! - Connection pooling
20//! - Caching strategies
21//! - Concurrent operation tuning
22
23#![allow(missing_docs)]
24
25use super::*;
26use bytes::{Bytes, BytesMut};
27use parking_lot::RwLock as PLRwLock;
28use std::{
29    collections::HashMap,
30    sync::Arc,
31    time::{Duration, Instant},
32};
33use tokio::sync::{Semaphore, mpsc};
34
35/// Performance configuration
36#[derive(Debug, Clone)]
37pub struct PerformanceConfig {
38    /// Maximum concurrent operations
39    pub max_concurrent_ops: usize,
40
41    /// Connection pool size
42    pub connection_pool_size: usize,
43
44    /// Cache configuration
45    pub cache_config: CacheConfig,
46
47    /// Serialization settings
48    pub serialization: SerializationConfig,
49
50    /// Batch operation settings
51    pub batch_config: BatchConfig,
52}
53
54impl Default for PerformanceConfig {
55    fn default() -> Self {
56        Self {
57            max_concurrent_ops: 1000,
58            connection_pool_size: 100,
59            cache_config: CacheConfig::default(),
60            serialization: SerializationConfig::default(),
61            batch_config: BatchConfig::default(),
62        }
63    }
64}
65
66/// Cache configuration
67#[derive(Debug, Clone)]
68pub struct CacheConfig {
69    /// Maximum cache entries
70    pub max_entries: usize,
71
72    /// Cache TTL
73    pub ttl: Duration,
74
75    /// Enable compression
76    pub compression: bool,
77}
78
79impl Default for CacheConfig {
80    fn default() -> Self {
81        Self {
82            max_entries: 10_000,
83            ttl: Duration::from_secs(300),
84            compression: true,
85        }
86    }
87}
88
89/// Serialization configuration
90#[derive(Debug, Clone)]
91pub struct SerializationConfig {
92    /// Use zero-copy deserialization
93    pub zero_copy: bool,
94
95    /// Pre-allocated buffer size
96    pub buffer_size: usize,
97
98    /// Use compression
99    pub compression: bool,
100}
101
102impl Default for SerializationConfig {
103    fn default() -> Self {
104        Self {
105            zero_copy: true,
106            buffer_size: 4096,
107            compression: false,
108        }
109    }
110}
111
112/// Batch operation configuration
113#[derive(Debug, Clone)]
114pub struct BatchConfig {
115    /// Maximum batch size
116    pub max_batch_size: usize,
117
118    /// Batch timeout
119    pub batch_timeout: Duration,
120
121    /// Enable automatic batching
122    pub auto_batch: bool,
123}
124
125impl Default for BatchConfig {
126    fn default() -> Self {
127        Self {
128            max_batch_size: 100,
129            batch_timeout: Duration::from_millis(10),
130            auto_batch: true,
131        }
132    }
133}
134
135/// Zero-copy message wrapper
136pub struct ZeroCopyMessage {
137    data: Bytes,
138}
139
140impl ZeroCopyMessage {
141    /// Create from bytes
142    pub fn new(data: Bytes) -> Self {
143        Self { data }
144    }
145
146    /// Get reference to data
147    pub fn as_bytes(&self) -> &[u8] {
148        &self.data
149    }
150
151    /// Deserialize without copying
152    pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
153        bincode::deserialize(&self.data).map_err(AdaptiveNetworkError::Serialization)
154    }
155}
156
157/// Optimized serializer with buffer reuse
158pub struct OptimizedSerializer {
159    buffer_pool: Arc<PLRwLock<Vec<BytesMut>>>,
160    config: SerializationConfig,
161}
162
163impl OptimizedSerializer {
164    pub fn new(config: SerializationConfig) -> Self {
165        Self {
166            buffer_pool: Arc::new(PLRwLock::new(Vec::new())),
167            config,
168        }
169    }
170
171    /// Serialize with buffer reuse
172    pub fn serialize<T: serde::Serialize>(&self, value: &T) -> Result<Bytes> {
173        // Get buffer from pool or create new
174        let mut buffer = self
175            .buffer_pool
176            .write()
177            .pop()
178            .unwrap_or_else(|| BytesMut::with_capacity(self.config.buffer_size));
179
180        buffer.clear();
181
182        // Serialize to buffer
183        let serialized = bincode::serialize(value).map_err(AdaptiveNetworkError::Serialization)?;
184        buffer.extend_from_slice(&serialized);
185
186        // Optional compression
187        let bytes = if self.config.compression {
188            let compressed = self.compress(&buffer)?;
189            // Return buffer to pool after compression
190            if buffer.capacity() <= self.config.buffer_size * 2 {
191                self.buffer_pool.write().push(buffer);
192            }
193            compressed
194        } else {
195            // freeze() consumes the buffer, so we can't return it to the pool
196            buffer.freeze()
197        };
198
199        Ok(bytes)
200    }
201
202    fn compress(&self, data: &[u8]) -> Result<Bytes> {
203        use flate2::Compression;
204        use flate2::write::GzEncoder;
205        use std::io::Write;
206
207        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
208        encoder.write_all(data)?;
209        Ok(Bytes::from(encoder.finish()?))
210    }
211}
212
213/// Connection pool for reusing connections
214pub struct ConnectionPool<T> {
215    connections: Arc<PLRwLock<HashMap<String, Vec<T>>>>,
216    semaphore: Arc<Semaphore>,
217    max_per_host: usize,
218}
219
220impl<T: Send> ConnectionPool<T> {
221    pub fn new(max_connections: usize, max_per_host: usize) -> Self {
222        Self {
223            connections: Arc::new(PLRwLock::new(HashMap::new())),
224            semaphore: Arc::new(Semaphore::new(max_connections)),
225            max_per_host,
226        }
227    }
228
229    /// Get connection from pool
230    pub async fn get(&self, host: &str) -> Option<T> {
231        let _permit = self.semaphore.acquire().await.ok()?;
232        self.connections
233            .write()
234            .get_mut(host)
235            .and_then(|conns| conns.pop())
236    }
237
238    /// Return connection to pool
239    pub fn put(&self, host: String, conn: T) {
240        let mut pool = self.connections.write();
241        let conns = pool.entry(host).or_default();
242
243        if conns.len() < self.max_per_host {
244            conns.push(conn);
245        }
246    }
247}
248
249/// High-performance cache with TTL
250pub struct PerformanceCache<K, V> {
251    entries: Arc<PLRwLock<HashMap<K, CacheEntry<V>>>>,
252    config: CacheConfig,
253}
254
255#[derive(Clone)]
256struct CacheEntry<V> {
257    value: V,
258    inserted_at: Instant,
259}
260
261impl<K: Eq + std::hash::Hash + Clone, V: Clone> PerformanceCache<K, V> {
262    pub fn new(config: CacheConfig) -> Self {
263        Self {
264            entries: Arc::new(PLRwLock::new(HashMap::new())),
265            config,
266        }
267    }
268
269    /// Get value from cache
270    pub fn get(&self, key: &K) -> Option<V> {
271        let entries = self.entries.read();
272        entries.get(key).and_then(|entry| {
273            if entry.inserted_at.elapsed() < self.config.ttl {
274                Some(entry.value.clone())
275            } else {
276                None
277            }
278        })
279    }
280
281    /// Insert value into cache
282    pub fn insert(&self, key: K, value: V) {
283        let mut entries = self.entries.write();
284
285        // Evict old entries if at capacity
286        if entries.len() >= self.config.max_entries {
287            let _now = Instant::now();
288            entries.retain(|_, entry| entry.inserted_at.elapsed() < self.config.ttl);
289
290            // If still over capacity, remove oldest
291            if entries.len() >= self.config.max_entries
292                && let Some(oldest_key) = entries
293                    .iter()
294                    .min_by_key(|(_, entry)| entry.inserted_at)
295                    .map(|(k, _)| k.clone())
296            {
297                entries.remove(&oldest_key);
298            }
299        }
300
301        entries.insert(
302            key,
303            CacheEntry {
304                value,
305                inserted_at: Instant::now(),
306            },
307        );
308    }
309
310    /// Clear expired entries
311    pub fn evict_expired(&self) {
312        let mut entries = self.entries.write();
313        let _now = Instant::now();
314        entries.retain(|_, entry| entry.inserted_at.elapsed() < self.config.ttl);
315    }
316}
317
318/// Batch processor for aggregating operations
319pub struct BatchProcessor<T> {
320    config: BatchConfig,
321    tx: mpsc::Sender<T>,
322    rx: Arc<tokio::sync::Mutex<mpsc::Receiver<T>>>,
323}
324
325impl<T: Send + 'static> BatchProcessor<T> {
326    pub fn new(config: BatchConfig) -> Self {
327        let (tx, rx) = mpsc::channel(config.max_batch_size * 10);
328        Self {
329            config,
330            tx,
331            rx: Arc::new(tokio::sync::Mutex::new(rx)),
332        }
333    }
334
335    /// Add item to batch
336    pub async fn add(&self, item: T) -> Result<()> {
337        self.tx
338            .send(item)
339            .await
340            .map_err(|_| AdaptiveNetworkError::Other("Batch processor closed".to_string()))?;
341        Ok(())
342    }
343
344    /// Process batch with given function
345    pub async fn process_batch<F, Fut>(&self, mut f: F) -> Result<()>
346    where
347        F: FnMut(Vec<T>) -> Fut,
348        Fut: std::future::Future<Output = Result<()>>,
349    {
350        let mut rx = self.rx.lock().await;
351        let mut batch = Vec::with_capacity(self.config.max_batch_size);
352
353        // Collect items up to batch size or timeout
354        let timeout = tokio::time::sleep(self.config.batch_timeout);
355        tokio::pin!(timeout);
356
357        loop {
358            tokio::select! {
359                Some(item) = rx.recv() => {
360                    batch.push(item);
361                    if batch.len() >= self.config.max_batch_size {
362                        break;
363                    }
364                }
365                _ = &mut timeout => {
366                    if !batch.is_empty() {
367                        break;
368                    }
369                }
370            }
371        }
372
373        if !batch.is_empty() {
374            f(batch).await?;
375        }
376
377        Ok(())
378    }
379}
380
381/// Concurrent operation limiter
382#[derive(Clone)]
383pub struct ConcurrencyLimiter {
384    semaphore: Arc<Semaphore>,
385}
386
387impl ConcurrencyLimiter {
388    pub fn new(max_concurrent: usize) -> Self {
389        Self {
390            semaphore: Arc::new(Semaphore::new(max_concurrent)),
391        }
392    }
393
394    /// Execute operation with concurrency limit
395    pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
396    where
397        F: FnOnce() -> Fut,
398        Fut: std::future::Future<Output = Result<T>>,
399    {
400        let _permit = self
401            .semaphore
402            .acquire()
403            .await
404            .map_err(|_| AdaptiveNetworkError::Other("Semaphore closed".to_string()))?;
405        f().await
406    }
407
408    /// Execute many operations with concurrency limit
409    pub async fn execute_many<F, Fut, T>(&self, operations: Vec<F>) -> Vec<Result<T>>
410    where
411        F: FnOnce() -> Fut + Send,
412        Fut: std::future::Future<Output = Result<T>> + Send,
413        T: Send,
414    {
415        let futures = operations.into_iter().map(|op| {
416            let semaphore = self.semaphore.clone();
417            async move {
418                let _permit = semaphore.acquire().await.ok()?;
419                Some(op().await)
420            }
421        });
422
423        futures::future::join_all(futures)
424            .await
425            .into_iter()
426            .flatten()
427            .collect()
428    }
429}
430
431/// Performance monitoring and metrics
432#[derive(Debug)]
433pub struct PerformanceMonitor {
434    operation_times: Arc<PLRwLock<HashMap<String, Vec<Duration>>>>,
435    start_times: Arc<PLRwLock<HashMap<String, Instant>>>,
436}
437
438impl Default for PerformanceMonitor {
439    fn default() -> Self {
440        Self::new()
441    }
442}
443
444impl PerformanceMonitor {
445    pub fn new() -> Self {
446        Self {
447            operation_times: Arc::new(PLRwLock::new(HashMap::new())),
448            start_times: Arc::new(PLRwLock::new(HashMap::new())),
449        }
450    }
451
452    /// Start timing an operation
453    pub fn start_operation(&self, name: &str) {
454        self.start_times
455            .write()
456            .insert(name.to_string(), Instant::now());
457    }
458
459    /// End timing an operation
460    pub fn end_operation(&self, name: &str) {
461        if let Some(start) = self.start_times.write().remove(name) {
462            let duration = start.elapsed();
463            self.operation_times
464                .write()
465                .entry(name.to_string())
466                .or_default()
467                .push(duration);
468        }
469    }
470
471    /// Get performance statistics
472    pub fn get_stats(&self, name: &str) -> Option<PerformanceStats> {
473        let times = self.operation_times.read();
474        times.get(name).map(|durations| {
475            let total: Duration = durations.iter().sum();
476            let count = durations.len();
477            let avg = total / count as u32;
478
479            let mut sorted = durations.clone();
480            sorted.sort();
481
482            PerformanceStats {
483                count,
484                avg_duration: avg,
485                min_duration: sorted.first().copied().unwrap_or_default(),
486                max_duration: sorted.last().copied().unwrap_or_default(),
487                p50_duration: sorted.get(count / 2).copied().unwrap_or_default(),
488                p99_duration: sorted.get(count * 99 / 100).copied().unwrap_or_default(),
489            }
490        })
491    }
492}
493
494#[derive(Debug, Clone)]
495pub struct PerformanceStats {
496    pub count: usize,
497    pub avg_duration: Duration,
498    pub min_duration: Duration,
499    pub max_duration: Duration,
500    pub p50_duration: Duration,
501    pub p99_duration: Duration,
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507
508    #[test]
509    fn test_zero_copy_message() {
510        let data = Bytes::from(vec![1, 2, 3, 4]);
511        let msg = ZeroCopyMessage::new(data);
512        assert_eq!(msg.as_bytes(), &[1, 2, 3, 4]);
513    }
514
515    #[tokio::test]
516    async fn test_batch_processor() {
517        let processor = BatchProcessor::new(BatchConfig {
518            max_batch_size: 3,
519            batch_timeout: Duration::from_millis(100),
520            auto_batch: true,
521        });
522
523        // Add items
524        processor.add(1).await.unwrap();
525        processor.add(2).await.unwrap();
526        processor.add(3).await.unwrap();
527
528        // Process batch
529        processor
530            .process_batch(|batch| async move {
531                assert_eq!(batch.len(), 3);
532                assert_eq!(batch, vec![1, 2, 3]);
533                Ok::<_, AdaptiveNetworkError>(())
534            })
535            .await
536            .unwrap();
537    }
538
539    #[tokio::test]
540    async fn test_concurrency_limiter() {
541        let limiter = ConcurrencyLimiter::new(2);
542        let counter = Arc::new(tokio::sync::Mutex::new(0));
543
544        let operations: Vec<_> = (0..5)
545            .map(|_| {
546                let counter = counter.clone();
547                let limiter = limiter.clone();
548
549                tokio::spawn(async move {
550                    limiter
551                        .execute(|| async {
552                            let mut count = counter.lock().await;
553                            *count += 1;
554                            Ok::<_, AdaptiveNetworkError>(())
555                        })
556                        .await
557                })
558            })
559            .collect();
560
561        for op in operations {
562            op.await.unwrap().unwrap();
563        }
564
565        assert_eq!(*counter.lock().await, 5);
566    }
567
568    #[test]
569    fn test_performance_cache() {
570        let cache = PerformanceCache::new(CacheConfig {
571            max_entries: 2,
572            ttl: Duration::from_secs(1),
573            compression: false,
574        });
575
576        cache.insert("key1", "value1");
577        cache.insert("key2", "value2");
578
579        assert_eq!(cache.get(&"key1"), Some("value1"));
580        assert_eq!(cache.get(&"key2"), Some("value2"));
581
582        // Add third item, should evict oldest
583        cache.insert("key3", "value3");
584        assert_eq!(cache.get(&"key3"), Some("value3"));
585    }
586
587    #[test]
588    fn test_performance_monitor() {
589        let monitor = PerformanceMonitor::new();
590
591        monitor.start_operation("test_op");
592        std::thread::sleep(Duration::from_millis(10));
593        monitor.end_operation("test_op");
594
595        let stats = monitor.get_stats("test_op").unwrap();
596        assert_eq!(stats.count, 1);
597        assert!(stats.avg_duration >= Duration::from_millis(10));
598    }
599}