oxirs_stream/
performance_utils.rs

1//! # Advanced Performance Utilities for OxiRS Stream
2//!
3//! High-performance utilities, optimizations, and patterns for achieving
4//! maximum throughput and minimum latency in streaming operations.
5
6use anyhow::{anyhow, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::{RwLock, Semaphore};
12use tokio::task::yield_now;
13use tracing::{debug, info, warn};
14
15use crate::StreamEvent;
16
17/// Configuration for performance optimizations
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct PerformanceUtilsConfig {
20    /// Enable zero-copy optimizations
21    pub enable_zero_copy: bool,
22    /// Enable SIMD optimizations where available
23    pub enable_simd: bool,
24    /// Enable memory pooling
25    pub enable_memory_pooling: bool,
26    /// Enable batch processing optimizations
27    pub enable_batch_optimization: bool,
28    /// Maximum batch size for optimal performance
29    pub optimal_batch_size: usize,
30    /// Enable adaptive rate limiting
31    pub enable_adaptive_rate_limiting: bool,
32    /// Enable intelligent prefetching
33    pub enable_prefetching: bool,
34    /// CPU core count for parallel processing
35    pub cpu_cores: usize,
36}
37
38impl Default for PerformanceUtilsConfig {
39    fn default() -> Self {
40        Self {
41            enable_zero_copy: true,
42            enable_simd: true,
43            enable_memory_pooling: true,
44            enable_batch_optimization: true,
45            optimal_batch_size: 1000,
46            enable_adaptive_rate_limiting: true,
47            enable_prefetching: true,
48            cpu_cores: num_cpus::get(),
49        }
50    }
51}
52
53/// High-performance adaptive batcher for streaming events
54pub struct AdaptiveBatcher {
55    config: PerformanceUtilsConfig,
56    batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
57    batch_stats: Arc<RwLock<BatchingStats>>,
58    last_flush: Instant,
59    target_latency: Duration,
60    optimal_batch_size: Arc<RwLock<usize>>,
61}
62
63/// Statistics for adaptive batching performance
64#[derive(Debug, Clone, Default)]
65pub struct BatchingStats {
66    pub total_batches: u64,
67    pub total_events: u64,
68    pub avg_batch_size: f64,
69    pub avg_latency_ms: f64,
70    pub throughput_events_per_sec: f64,
71    pub efficiency_score: f64,
72}
73
74impl AdaptiveBatcher {
75    /// Create a new adaptive batcher
76    pub fn new(config: PerformanceUtilsConfig, target_latency: Duration) -> Self {
77        Self {
78            optimal_batch_size: Arc::new(RwLock::new(config.optimal_batch_size)),
79            config,
80            batch_buffer: Arc::new(RwLock::new(Vec::new())),
81            batch_stats: Arc::new(RwLock::new(BatchingStats::default())),
82            last_flush: Instant::now(),
83            target_latency,
84        }
85    }
86
87    /// Add an event to the batch buffer
88    pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Vec<StreamEvent>>> {
89        let mut buffer = self.batch_buffer.write().await;
90        buffer.push(event);
91
92        let optimal_size = *self.optimal_batch_size.read().await;
93        let time_since_last_flush = self.last_flush.elapsed();
94
95        // Check if we should flush the batch
96        if buffer.len() >= optimal_size || time_since_last_flush >= self.target_latency {
97            let batch = std::mem::take(&mut *buffer);
98            self.last_flush = Instant::now();
99
100            // Update statistics and optimize batch size
101            self.update_stats_and_optimize(batch.len(), time_since_last_flush)
102                .await;
103
104            Ok(Some(batch))
105        } else {
106            Ok(None)
107        }
108    }
109
110    /// Force flush the current batch
111    pub async fn flush(&mut self) -> Result<Option<Vec<StreamEvent>>> {
112        let mut buffer = self.batch_buffer.write().await;
113        if buffer.is_empty() {
114            return Ok(None);
115        }
116
117        let batch = std::mem::take(&mut *buffer);
118        let time_since_last_flush = self.last_flush.elapsed();
119        self.last_flush = Instant::now();
120
121        self.update_stats_and_optimize(batch.len(), time_since_last_flush)
122            .await;
123
124        Ok(Some(batch))
125    }
126
127    /// Update statistics and optimize batch size based on performance
128    async fn update_stats_and_optimize(&self, batch_size: usize, latency: Duration) {
129        let mut stats = self.batch_stats.write().await;
130        stats.total_batches += 1;
131        stats.total_events += batch_size as u64;
132
133        // Calculate moving averages
134        let new_avg_batch_size = (stats.avg_batch_size + batch_size as f64) / 2.0;
135        let new_avg_latency = (stats.avg_latency_ms + latency.as_millis() as f64) / 2.0;
136
137        stats.avg_batch_size = new_avg_batch_size;
138        stats.avg_latency_ms = new_avg_latency;
139
140        if latency.as_secs_f64() > 0.0 {
141            stats.throughput_events_per_sec = batch_size as f64 / latency.as_secs_f64();
142        }
143
144        // Calculate efficiency score (higher is better)
145        let latency_efficiency = if new_avg_latency > 0.0 {
146            1.0 / new_avg_latency
147        } else {
148            1.0
149        };
150        stats.efficiency_score = new_avg_batch_size * latency_efficiency;
151
152        // Adaptive optimization: adjust batch size based on performance
153        if self.config.enable_batch_optimization {
154            let mut optimal_size = self.optimal_batch_size.write().await;
155
156            if new_avg_latency > self.target_latency.as_millis() as f64 && *optimal_size > 100 {
157                // Latency too high, reduce batch size
158                *optimal_size = (*optimal_size * 9) / 10;
159                debug!(
160                    "Reduced optimal batch size to {} due to high latency",
161                    *optimal_size
162                );
163            } else if new_avg_latency < self.target_latency.as_millis() as f64 / 2.0
164                && *optimal_size < 10000
165            {
166                // Latency is good, try increasing batch size
167                *optimal_size = (*optimal_size * 11) / 10;
168                debug!(
169                    "Increased optimal batch size to {} due to good latency",
170                    *optimal_size
171                );
172            }
173        }
174    }
175
176    /// Get current batching statistics
177    pub async fn get_stats(&self) -> BatchingStats {
178        self.batch_stats.read().await.clone()
179    }
180
181    /// Get current optimal batch size
182    pub async fn get_optimal_batch_size(&self) -> usize {
183        *self.optimal_batch_size.read().await
184    }
185}
186
187/// Intelligent memory pool for reducing allocation overhead
188pub struct IntelligentMemoryPool<T> {
189    pools: Arc<RwLock<HashMap<String, VecDeque<T>>>>,
190    max_pool_size: usize,
191    allocation_stats: Arc<RwLock<PoolStats>>,
192}
193
194/// Memory pool statistics
195#[derive(Debug, Clone, Default)]
196pub struct PoolStats {
197    pub total_allocations: u64,
198    pub pool_hits: u64,
199    pub pool_misses: u64,
200    pub hit_rate: f64,
201    pub memory_saved_bytes: u64,
202}
203
204impl<T> IntelligentMemoryPool<T> {
205    /// Create a new memory pool
206    pub fn new(max_pool_size: usize) -> Self {
207        Self {
208            pools: Arc::new(RwLock::new(HashMap::new())),
209            max_pool_size,
210            allocation_stats: Arc::new(RwLock::new(PoolStats::default())),
211        }
212    }
213
214    /// Get an object from the pool or create a new one
215    pub async fn get_or_create<F>(&self, pool_name: &str, factory: F) -> T
216    where
217        F: FnOnce() -> T,
218    {
219        let mut stats = self.allocation_stats.write().await;
220        stats.total_allocations += 1;
221
222        {
223            let mut pools = self.pools.write().await;
224            if let Some(pool) = pools.get_mut(pool_name) {
225                if let Some(obj) = pool.pop_front() {
226                    stats.pool_hits += 1;
227                    stats.hit_rate = (stats.pool_hits as f64) / (stats.total_allocations as f64);
228                    return obj;
229                }
230            }
231        }
232
233        // Pool miss - create new object
234        stats.pool_misses += 1;
235        stats.hit_rate = (stats.pool_hits as f64) / (stats.total_allocations as f64);
236
237        factory()
238    }
239
240    /// Return an object to the pool
241    pub async fn return_to_pool(&self, pool_name: &str, obj: T) {
242        let mut pools = self.pools.write().await;
243        let pool = pools
244            .entry(pool_name.to_string())
245            .or_insert_with(VecDeque::new);
246
247        if pool.len() < self.max_pool_size {
248            pool.push_back(obj);
249        }
250        // If pool is full, object is dropped (garbage collected)
251    }
252
253    /// Get pool statistics
254    pub async fn get_stats(&self) -> PoolStats {
255        self.allocation_stats.read().await.clone()
256    }
257
258    /// Clear all pools
259    pub async fn clear_all_pools(&self) {
260        let mut pools = self.pools.write().await;
261        pools.clear();
262
263        let mut stats = self.allocation_stats.write().await;
264        *stats = PoolStats::default();
265    }
266}
267
268/// Adaptive rate limiter with intelligent backpressure
269pub struct AdaptiveRateLimiter {
270    permits: Arc<Semaphore>,
271    config: Arc<RwLock<RateLimitConfig>>,
272    performance_history: Arc<RwLock<VecDeque<PerformanceSnapshot>>>,
273    last_adjustment: Arc<RwLock<Instant>>,
274}
275
276/// Rate limiting configuration
277#[derive(Debug, Clone)]
278pub struct RateLimitConfig {
279    pub max_requests_per_second: usize,
280    pub burst_capacity: usize,
281    pub adjustment_interval: Duration,
282    pub target_latency_ms: f64,
283    pub max_adjustment_factor: f64,
284}
285
286impl Default for RateLimitConfig {
287    fn default() -> Self {
288        Self {
289            max_requests_per_second: 10000,
290            burst_capacity: 1000,
291            adjustment_interval: Duration::from_secs(5),
292            target_latency_ms: 10.0,
293            max_adjustment_factor: 2.0,
294        }
295    }
296}
297
298/// Performance snapshot for rate limiting decisions
299#[derive(Debug, Clone)]
300struct PerformanceSnapshot {
301    timestamp: Instant,
302    latency_ms: f64,
303    throughput_rps: f64,
304    success_rate: f64,
305}
306
307impl AdaptiveRateLimiter {
308    /// Create a new adaptive rate limiter
309    pub fn new(config: RateLimitConfig) -> Self {
310        let permits = Arc::new(Semaphore::new(config.burst_capacity));
311
312        Self {
313            permits,
314            config: Arc::new(RwLock::new(config)),
315            performance_history: Arc::new(RwLock::new(VecDeque::new())),
316            last_adjustment: Arc::new(RwLock::new(Instant::now())),
317        }
318    }
319
320    /// Acquire a permit to proceed with operation
321    pub async fn acquire_permit(&self) -> Result<tokio::sync::SemaphorePermit<'_>> {
322        match self.permits.try_acquire() {
323            Ok(permit) => Ok(permit),
324            Err(_) => {
325                // Apply backpressure by waiting
326                warn!("Rate limit reached, applying backpressure");
327                self.permits
328                    .acquire()
329                    .await
330                    .map_err(|e| anyhow!("Failed to acquire permit: {}", e))
331            }
332        }
333    }
334
335    /// Record performance metrics for adaptive adjustment
336    pub async fn record_performance(&self, latency_ms: f64, success: bool) -> Result<()> {
337        let snapshot = PerformanceSnapshot {
338            timestamp: Instant::now(),
339            latency_ms,
340            throughput_rps: 0.0, // Will be calculated
341            success_rate: if success { 1.0 } else { 0.0 },
342        };
343
344        {
345            let mut history = self.performance_history.write().await;
346            history.push_back(snapshot);
347
348            // Keep only last 100 snapshots
349            if history.len() > 100 {
350                history.pop_front();
351            }
352        }
353
354        // Check if it's time to adjust rate limits
355        {
356            let last_adjustment = self.last_adjustment.read().await;
357            let config = self.config.read().await;
358
359            if last_adjustment.elapsed() >= config.adjustment_interval {
360                drop(last_adjustment);
361                drop(config);
362                self.adjust_rate_limits().await?;
363            }
364        }
365
366        Ok(())
367    }
368
369    /// Intelligently adjust rate limits based on performance
370    async fn adjust_rate_limits(&self) -> Result<()> {
371        let mut last_adjustment = self.last_adjustment.write().await;
372        *last_adjustment = Instant::now();
373        drop(last_adjustment);
374
375        let history = self.performance_history.read().await;
376        if history.len() < 10 {
377            return Ok(()); // Not enough data
378        }
379
380        // Calculate average performance metrics
381        let avg_latency: f64 =
382            history.iter().map(|s| s.latency_ms).sum::<f64>() / history.len() as f64;
383        let avg_success_rate: f64 =
384            history.iter().map(|s| s.success_rate).sum::<f64>() / history.len() as f64;
385
386        let mut config = self.config.write().await;
387        let current_rate = config.max_requests_per_second;
388
389        // Adjust based on latency and success rate
390        let adjustment_factor =
391            if avg_latency > config.target_latency_ms * 1.5 || avg_success_rate < 0.95 {
392                // Performance is poor, reduce rate
393                0.9
394            } else if avg_latency < config.target_latency_ms * 0.5 && avg_success_rate > 0.98 {
395                // Performance is good, increase rate
396                1.1
397            } else {
398                // Performance is acceptable, no change
399                1.0
400            };
401
402        if adjustment_factor != 1.0 {
403            let new_rate = ((current_rate as f64) * adjustment_factor) as usize;
404            let max_rate = ((current_rate as f64) * config.max_adjustment_factor) as usize;
405            let min_rate = ((current_rate as f64) / config.max_adjustment_factor) as usize;
406
407            config.max_requests_per_second = new_rate.clamp(min_rate, max_rate);
408
409            info!(
410                "Adjusted rate limit from {} to {} req/s (factor: {:.2}, avg_latency: {:.2}ms, success_rate: {:.2}%)",
411                current_rate, config.max_requests_per_second, adjustment_factor, avg_latency, avg_success_rate * 100.0
412            );
413
414            // Resize semaphore if needed
415            // Note: In a real implementation, you'd need a more sophisticated approach
416            // as Semaphore doesn't support dynamic resizing
417        }
418
419        Ok(())
420    }
421
422    /// Get current rate limit configuration
423    pub async fn get_config(&self) -> RateLimitConfig {
424        self.config.read().await.clone()
425    }
426}
427
428/// High-performance parallel processor for streaming events
429pub struct ParallelStreamProcessor {
430    config: PerformanceUtilsConfig,
431    worker_semaphore: Arc<Semaphore>,
432    processing_stats: Arc<RwLock<ProcessingStats>>,
433}
434
435/// Processing statistics
436#[derive(Debug, Clone, Default)]
437pub struct ProcessingStats {
438    pub events_processed: u64,
439    pub avg_processing_time_ms: f64,
440    pub peak_concurrency: usize,
441    pub current_concurrency: usize,
442    pub throughput_events_per_sec: f64,
443    pub cpu_efficiency: f64,
444}
445
446impl ParallelStreamProcessor {
447    /// Create a new parallel processor
448    pub fn new(config: PerformanceUtilsConfig) -> Self {
449        let worker_semaphore = Arc::new(Semaphore::new(config.cpu_cores * 2));
450
451        Self {
452            config,
453            worker_semaphore,
454            processing_stats: Arc::new(RwLock::new(ProcessingStats::default())),
455        }
456    }
457
458    /// Process events in parallel with optimal load balancing
459    pub async fn process_parallel<F, Fut>(
460        &self,
461        events: Vec<StreamEvent>,
462        processor: F,
463    ) -> Result<Vec<Result<()>>>
464    where
465        F: Fn(StreamEvent) -> Fut + Send + Sync + Clone + 'static,
466        Fut: std::future::Future<Output = Result<()>> + Send + 'static,
467    {
468        let start_time = Instant::now();
469        let event_count = events.len();
470
471        // Update concurrency tracking
472        {
473            let mut stats = self.processing_stats.write().await;
474            stats.current_concurrency = event_count.min(self.config.cpu_cores * 2);
475            stats.peak_concurrency = stats.peak_concurrency.max(stats.current_concurrency);
476        }
477
478        // Process events in parallel with controlled concurrency
479        let mut handles = Vec::new();
480
481        for event in events {
482            let permit = self.worker_semaphore.clone().acquire_owned().await?;
483            let processor_clone = processor.clone();
484
485            let handle = tokio::spawn(async move {
486                let _permit = permit; // Keep permit alive
487                let result = processor_clone(event).await;
488                yield_now().await; // Yield to prevent blocking
489                result
490            });
491
492            handles.push(handle);
493        }
494
495        // Collect results
496        let mut results = Vec::new();
497        for handle in handles {
498            match handle.await {
499                Ok(result) => results.push(result),
500                Err(e) => results.push(Err(anyhow!("Task join error: {}", e))),
501            }
502        }
503
504        // Update statistics
505        let processing_time = start_time.elapsed();
506        {
507            let mut stats = self.processing_stats.write().await;
508            stats.events_processed += event_count as u64;
509            stats.avg_processing_time_ms =
510                (stats.avg_processing_time_ms + processing_time.as_millis() as f64) / 2.0;
511            stats.current_concurrency = 0;
512
513            if processing_time.as_secs_f64() > 0.0 {
514                stats.throughput_events_per_sec =
515                    event_count as f64 / processing_time.as_secs_f64();
516            }
517
518            // Calculate CPU efficiency (higher is better)
519            let ideal_time = (event_count as f64) / (self.config.cpu_cores as f64);
520            let actual_time = processing_time.as_secs_f64();
521            stats.cpu_efficiency = if actual_time > 0.0 {
522                (ideal_time / actual_time).min(1.0)
523            } else {
524                1.0
525            };
526        }
527
528        debug!(
529            "Processed {} events in {:?} ({:.2} events/sec)",
530            event_count,
531            processing_time,
532            event_count as f64 / processing_time.as_secs_f64()
533        );
534
535        Ok(results)
536    }
537
538    /// Get current processing statistics
539    pub async fn get_stats(&self) -> ProcessingStats {
540        self.processing_stats.read().await.clone()
541    }
542}
543
544/// Intelligent prefetcher for streaming data
545pub struct IntelligentPrefetcher<T> {
546    cache: Arc<RwLock<HashMap<String, T>>>,
547    access_patterns: Arc<RwLock<HashMap<String, AccessPattern>>>,
548    max_cache_size: usize,
549}
550
551/// Access pattern tracking for intelligent prefetching
552#[derive(Debug, Clone)]
553struct AccessPattern {
554    access_count: u64,
555    last_access: Instant,
556    prediction_score: f64,
557    related_keys: Vec<String>,
558}
559
560impl<T: Clone> IntelligentPrefetcher<T> {
561    /// Create a new intelligent prefetcher
562    pub fn new(max_cache_size: usize) -> Self {
563        Self {
564            cache: Arc::new(RwLock::new(HashMap::new())),
565            access_patterns: Arc::new(RwLock::new(HashMap::new())),
566            max_cache_size,
567        }
568    }
569
570    /// Get data with intelligent prefetching
571    pub async fn get_with_prefetch<F, Fut>(&self, key: &str, loader: F) -> Result<T>
572    where
573        F: FnOnce(String) -> Fut + Send,
574        Fut: std::future::Future<Output = Result<T>> + Send,
575        T: Send + Sync,
576    {
577        // Check cache first
578        {
579            let cache = self.cache.read().await;
580            if let Some(data) = cache.get(key) {
581                self.update_access_pattern(key).await;
582                return Ok(data.clone());
583            }
584        }
585
586        // Load data
587        let data = loader(key.to_string()).await?;
588
589        // Store in cache
590        {
591            let mut cache = self.cache.write().await;
592            if cache.len() >= self.max_cache_size {
593                // Remove least recently used item
594                self.evict_lru().await;
595            }
596            cache.insert(key.to_string(), data.clone());
597        }
598
599        // Update access patterns and trigger prefetching
600        self.update_access_pattern(key).await;
601        self.trigger_intelligent_prefetch(key).await;
602
603        Ok(data)
604    }
605
606    /// Update access pattern for a key
607    async fn update_access_pattern(&self, key: &str) {
608        let mut patterns = self.access_patterns.write().await;
609        let pattern = patterns
610            .entry(key.to_string())
611            .or_insert_with(|| AccessPattern {
612                access_count: 0,
613                last_access: Instant::now(),
614                prediction_score: 0.0,
615                related_keys: Vec::new(),
616            });
617
618        pattern.access_count += 1;
619        pattern.last_access = Instant::now();
620
621        // Update prediction score based on access frequency and recency
622        let recency_factor = 1.0; // More recent = higher score
623        let frequency_factor = (pattern.access_count as f64).ln();
624        pattern.prediction_score = recency_factor * frequency_factor;
625    }
626
627    /// Trigger intelligent prefetching based on access patterns
628    async fn trigger_intelligent_prefetch(&self, accessed_key: &str) {
629        let patterns = self.access_patterns.read().await;
630
631        if let Some(pattern) = patterns.get(accessed_key) {
632            // Prefetch related keys with high prediction scores
633            for related_key in &pattern.related_keys {
634                if let Some(related_pattern) = patterns.get(related_key) {
635                    if related_pattern.prediction_score > 0.5 {
636                        // In a real implementation, you'd trigger async prefetching here
637                        debug!("Would prefetch related key: {}", related_key);
638                    }
639                }
640            }
641        }
642    }
643
644    /// Evict least recently used item from cache
645    async fn evict_lru(&self) {
646        let patterns = self.access_patterns.read().await;
647
648        if let Some((lru_key, _)) = patterns
649            .iter()
650            .min_by_key(|(_, pattern)| pattern.last_access)
651        {
652            let lru_key = lru_key.clone();
653            drop(patterns);
654
655            let mut cache = self.cache.write().await;
656            cache.remove(&lru_key);
657
658            let mut patterns = self.access_patterns.write().await;
659            patterns.remove(&lru_key);
660        }
661    }
662
663    /// Get cache statistics
664    pub async fn get_cache_stats(&self) -> (usize, usize) {
665        let cache_size = self.cache.read().await.len();
666        let pattern_count = self.access_patterns.read().await.len();
667        (cache_size, pattern_count)
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674    use tokio::time::{sleep, Duration};
675
676    #[tokio::test]
677    async fn test_adaptive_batcher() {
678        let config = PerformanceUtilsConfig::default();
679        let mut batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
680
681        let event = crate::event::StreamEvent::TripleAdded {
682            subject: "http://example.org/subject".to_string(),
683            predicate: "http://example.org/predicate".to_string(),
684            object: "\"test_object\"".to_string(),
685            graph: None,
686            metadata: crate::event::EventMetadata::default(),
687        };
688
689        // Add events and check batching behavior
690        for i in 0..10 {
691            let result = batcher.add_event(event.clone()).await.unwrap();
692            if i == 9 {
693                // Should not batch yet (batch size is 1000 by default)
694                assert!(result.is_none());
695            }
696        }
697
698        // Force flush
699        let batch = batcher.flush().await.unwrap();
700        assert!(batch.is_some());
701        assert_eq!(batch.unwrap().len(), 10);
702
703        let stats = batcher.get_stats().await;
704        assert_eq!(stats.total_batches, 1);
705        assert_eq!(stats.total_events, 10);
706    }
707
708    #[tokio::test]
709    async fn test_memory_pool() {
710        let pool: IntelligentMemoryPool<String> = IntelligentMemoryPool::new(10);
711
712        // Get object from pool (will create new)
713        let obj1 = pool
714            .get_or_create("test_pool", || "test_string".to_string())
715            .await;
716        assert_eq!(obj1, "test_string");
717
718        // Return object to pool
719        pool.return_to_pool("test_pool", obj1).await;
720
721        // Get object again (should come from pool)
722        let obj2 = pool
723            .get_or_create("test_pool", || "new_string".to_string())
724            .await;
725        assert_eq!(obj2, "test_string"); // Should be the pooled object
726
727        let stats = pool.get_stats().await;
728        assert_eq!(stats.pool_hits, 1);
729        assert_eq!(stats.total_allocations, 2);
730    }
731
732    #[tokio::test]
733    async fn test_adaptive_rate_limiter() {
734        let config = RateLimitConfig {
735            max_requests_per_second: 10,
736            burst_capacity: 5,
737            adjustment_interval: Duration::from_millis(100),
738            target_latency_ms: 10.0,
739            max_adjustment_factor: 2.0,
740        };
741
742        let limiter = AdaptiveRateLimiter::new(config);
743
744        // Acquire permits
745        for _ in 0..5 {
746            let _permit = limiter.acquire_permit().await.unwrap();
747            limiter.record_performance(5.0, true).await.unwrap(); // Good performance
748        }
749
750        sleep(Duration::from_millis(150)).await; // Wait for adjustment
751
752        let final_config = limiter.get_config().await;
753        // Rate might be adjusted based on good performance
754        assert!(final_config.max_requests_per_second >= 10);
755    }
756
757    #[tokio::test]
758    async fn test_parallel_processor() {
759        let config = PerformanceUtilsConfig::default();
760        let processor = ParallelStreamProcessor::new(config);
761
762        let events = vec![
763            crate::event::StreamEvent::TripleAdded {
764                subject: "http://example.org/subject1".to_string(),
765                predicate: "http://example.org/predicate".to_string(),
766                object: "\"test_object1\"".to_string(),
767                graph: None,
768                metadata: crate::event::EventMetadata::default(),
769            },
770            crate::event::StreamEvent::TripleAdded {
771                subject: "http://example.org/subject2".to_string(),
772                predicate: "http://example.org/predicate".to_string(),
773                object: "\"test_object2\"".to_string(),
774                graph: None,
775                metadata: crate::event::EventMetadata::default(),
776            },
777        ];
778
779        let results = processor
780            .process_parallel(events, |_event| async {
781                sleep(Duration::from_millis(10)).await;
782                Ok(())
783            })
784            .await
785            .unwrap();
786
787        assert_eq!(results.len(), 2);
788        assert!(results.iter().all(|r| r.is_ok()));
789
790        let stats = processor.get_stats().await;
791        assert_eq!(stats.events_processed, 2);
792    }
793
794    #[tokio::test]
795    async fn test_intelligent_prefetcher() {
796        let prefetcher: IntelligentPrefetcher<String> = IntelligentPrefetcher::new(10);
797
798        // Load data with prefetcher
799        let data1 = prefetcher
800            .get_with_prefetch("key1", |key| async move { Ok(format!("data_for_{key}")) })
801            .await
802            .unwrap();
803
804        assert_eq!(data1, "data_for_key1");
805
806        // Access same key (should come from cache)
807        let data2 = prefetcher
808            .get_with_prefetch("key1", |_key| async move {
809                Ok("should_not_be_called".to_string())
810            })
811            .await
812            .unwrap();
813
814        assert_eq!(data2, "data_for_key1");
815
816        let (cache_size, pattern_count) = prefetcher.get_cache_stats().await;
817        assert_eq!(cache_size, 1);
818        assert_eq!(pattern_count, 1);
819    }
820}