1use anyhow::{anyhow, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::{RwLock, Semaphore};
12use tokio::task::yield_now;
13use tracing::{debug, info, warn};
14
15use crate::StreamEvent;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct PerformanceUtilsConfig {
20 pub enable_zero_copy: bool,
22 pub enable_simd: bool,
24 pub enable_memory_pooling: bool,
26 pub enable_batch_optimization: bool,
28 pub optimal_batch_size: usize,
30 pub enable_adaptive_rate_limiting: bool,
32 pub enable_prefetching: bool,
34 pub cpu_cores: usize,
36}
37
38impl Default for PerformanceUtilsConfig {
39 fn default() -> Self {
40 Self {
41 enable_zero_copy: true,
42 enable_simd: true,
43 enable_memory_pooling: true,
44 enable_batch_optimization: true,
45 optimal_batch_size: 1000,
46 enable_adaptive_rate_limiting: true,
47 enable_prefetching: true,
48 cpu_cores: num_cpus::get(),
49 }
50 }
51}
52
53pub struct AdaptiveBatcher {
55 config: PerformanceUtilsConfig,
56 batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
57 batch_stats: Arc<RwLock<BatchingStats>>,
58 last_flush: Instant,
59 target_latency: Duration,
60 optimal_batch_size: Arc<RwLock<usize>>,
61}
62
63#[derive(Debug, Clone, Default)]
65pub struct BatchingStats {
66 pub total_batches: u64,
67 pub total_events: u64,
68 pub avg_batch_size: f64,
69 pub avg_latency_ms: f64,
70 pub throughput_events_per_sec: f64,
71 pub efficiency_score: f64,
72}
73
74impl AdaptiveBatcher {
75 pub fn new(config: PerformanceUtilsConfig, target_latency: Duration) -> Self {
77 Self {
78 optimal_batch_size: Arc::new(RwLock::new(config.optimal_batch_size)),
79 config,
80 batch_buffer: Arc::new(RwLock::new(Vec::new())),
81 batch_stats: Arc::new(RwLock::new(BatchingStats::default())),
82 last_flush: Instant::now(),
83 target_latency,
84 }
85 }
86
87 pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Vec<StreamEvent>>> {
89 let mut buffer = self.batch_buffer.write().await;
90 buffer.push(event);
91
92 let optimal_size = *self.optimal_batch_size.read().await;
93 let time_since_last_flush = self.last_flush.elapsed();
94
95 if buffer.len() >= optimal_size || time_since_last_flush >= self.target_latency {
97 let batch = std::mem::take(&mut *buffer);
98 self.last_flush = Instant::now();
99
100 self.update_stats_and_optimize(batch.len(), time_since_last_flush)
102 .await;
103
104 Ok(Some(batch))
105 } else {
106 Ok(None)
107 }
108 }
109
110 pub async fn flush(&mut self) -> Result<Option<Vec<StreamEvent>>> {
112 let mut buffer = self.batch_buffer.write().await;
113 if buffer.is_empty() {
114 return Ok(None);
115 }
116
117 let batch = std::mem::take(&mut *buffer);
118 let time_since_last_flush = self.last_flush.elapsed();
119 self.last_flush = Instant::now();
120
121 self.update_stats_and_optimize(batch.len(), time_since_last_flush)
122 .await;
123
124 Ok(Some(batch))
125 }
126
127 async fn update_stats_and_optimize(&self, batch_size: usize, latency: Duration) {
129 let mut stats = self.batch_stats.write().await;
130 stats.total_batches += 1;
131 stats.total_events += batch_size as u64;
132
133 let new_avg_batch_size = (stats.avg_batch_size + batch_size as f64) / 2.0;
135 let new_avg_latency = (stats.avg_latency_ms + latency.as_millis() as f64) / 2.0;
136
137 stats.avg_batch_size = new_avg_batch_size;
138 stats.avg_latency_ms = new_avg_latency;
139
140 if latency.as_secs_f64() > 0.0 {
141 stats.throughput_events_per_sec = batch_size as f64 / latency.as_secs_f64();
142 }
143
144 let latency_efficiency = if new_avg_latency > 0.0 {
146 1.0 / new_avg_latency
147 } else {
148 1.0
149 };
150 stats.efficiency_score = new_avg_batch_size * latency_efficiency;
151
152 if self.config.enable_batch_optimization {
154 let mut optimal_size = self.optimal_batch_size.write().await;
155
156 if new_avg_latency > self.target_latency.as_millis() as f64 && *optimal_size > 100 {
157 *optimal_size = (*optimal_size * 9) / 10;
159 debug!(
160 "Reduced optimal batch size to {} due to high latency",
161 *optimal_size
162 );
163 } else if new_avg_latency < self.target_latency.as_millis() as f64 / 2.0
164 && *optimal_size < 10000
165 {
166 *optimal_size = (*optimal_size * 11) / 10;
168 debug!(
169 "Increased optimal batch size to {} due to good latency",
170 *optimal_size
171 );
172 }
173 }
174 }
175
176 pub async fn get_stats(&self) -> BatchingStats {
178 self.batch_stats.read().await.clone()
179 }
180
181 pub async fn get_optimal_batch_size(&self) -> usize {
183 *self.optimal_batch_size.read().await
184 }
185}
186
187pub struct IntelligentMemoryPool<T> {
189 pools: Arc<RwLock<HashMap<String, VecDeque<T>>>>,
190 max_pool_size: usize,
191 allocation_stats: Arc<RwLock<PoolStats>>,
192}
193
194#[derive(Debug, Clone, Default)]
196pub struct PoolStats {
197 pub total_allocations: u64,
198 pub pool_hits: u64,
199 pub pool_misses: u64,
200 pub hit_rate: f64,
201 pub memory_saved_bytes: u64,
202}
203
204impl<T> IntelligentMemoryPool<T> {
205 pub fn new(max_pool_size: usize) -> Self {
207 Self {
208 pools: Arc::new(RwLock::new(HashMap::new())),
209 max_pool_size,
210 allocation_stats: Arc::new(RwLock::new(PoolStats::default())),
211 }
212 }
213
214 pub async fn get_or_create<F>(&self, pool_name: &str, factory: F) -> T
216 where
217 F: FnOnce() -> T,
218 {
219 let mut stats = self.allocation_stats.write().await;
220 stats.total_allocations += 1;
221
222 {
223 let mut pools = self.pools.write().await;
224 if let Some(pool) = pools.get_mut(pool_name) {
225 if let Some(obj) = pool.pop_front() {
226 stats.pool_hits += 1;
227 stats.hit_rate = (stats.pool_hits as f64) / (stats.total_allocations as f64);
228 return obj;
229 }
230 }
231 }
232
233 stats.pool_misses += 1;
235 stats.hit_rate = (stats.pool_hits as f64) / (stats.total_allocations as f64);
236
237 factory()
238 }
239
240 pub async fn return_to_pool(&self, pool_name: &str, obj: T) {
242 let mut pools = self.pools.write().await;
243 let pool = pools
244 .entry(pool_name.to_string())
245 .or_insert_with(VecDeque::new);
246
247 if pool.len() < self.max_pool_size {
248 pool.push_back(obj);
249 }
250 }
252
253 pub async fn get_stats(&self) -> PoolStats {
255 self.allocation_stats.read().await.clone()
256 }
257
258 pub async fn clear_all_pools(&self) {
260 let mut pools = self.pools.write().await;
261 pools.clear();
262
263 let mut stats = self.allocation_stats.write().await;
264 *stats = PoolStats::default();
265 }
266}
267
268pub struct AdaptiveRateLimiter {
270 permits: Arc<Semaphore>,
271 config: Arc<RwLock<RateLimitConfig>>,
272 performance_history: Arc<RwLock<VecDeque<PerformanceSnapshot>>>,
273 last_adjustment: Arc<RwLock<Instant>>,
274}
275
276#[derive(Debug, Clone)]
278pub struct RateLimitConfig {
279 pub max_requests_per_second: usize,
280 pub burst_capacity: usize,
281 pub adjustment_interval: Duration,
282 pub target_latency_ms: f64,
283 pub max_adjustment_factor: f64,
284}
285
286impl Default for RateLimitConfig {
287 fn default() -> Self {
288 Self {
289 max_requests_per_second: 10000,
290 burst_capacity: 1000,
291 adjustment_interval: Duration::from_secs(5),
292 target_latency_ms: 10.0,
293 max_adjustment_factor: 2.0,
294 }
295 }
296}
297
298#[derive(Debug, Clone)]
300struct PerformanceSnapshot {
301 timestamp: Instant,
302 latency_ms: f64,
303 throughput_rps: f64,
304 success_rate: f64,
305}
306
307impl AdaptiveRateLimiter {
308 pub fn new(config: RateLimitConfig) -> Self {
310 let permits = Arc::new(Semaphore::new(config.burst_capacity));
311
312 Self {
313 permits,
314 config: Arc::new(RwLock::new(config)),
315 performance_history: Arc::new(RwLock::new(VecDeque::new())),
316 last_adjustment: Arc::new(RwLock::new(Instant::now())),
317 }
318 }
319
320 pub async fn acquire_permit(&self) -> Result<tokio::sync::SemaphorePermit<'_>> {
322 match self.permits.try_acquire() {
323 Ok(permit) => Ok(permit),
324 Err(_) => {
325 warn!("Rate limit reached, applying backpressure");
327 self.permits
328 .acquire()
329 .await
330 .map_err(|e| anyhow!("Failed to acquire permit: {}", e))
331 }
332 }
333 }
334
335 pub async fn record_performance(&self, latency_ms: f64, success: bool) -> Result<()> {
337 let snapshot = PerformanceSnapshot {
338 timestamp: Instant::now(),
339 latency_ms,
340 throughput_rps: 0.0, success_rate: if success { 1.0 } else { 0.0 },
342 };
343
344 {
345 let mut history = self.performance_history.write().await;
346 history.push_back(snapshot);
347
348 if history.len() > 100 {
350 history.pop_front();
351 }
352 }
353
354 {
356 let last_adjustment = self.last_adjustment.read().await;
357 let config = self.config.read().await;
358
359 if last_adjustment.elapsed() >= config.adjustment_interval {
360 drop(last_adjustment);
361 drop(config);
362 self.adjust_rate_limits().await?;
363 }
364 }
365
366 Ok(())
367 }
368
369 async fn adjust_rate_limits(&self) -> Result<()> {
371 let mut last_adjustment = self.last_adjustment.write().await;
372 *last_adjustment = Instant::now();
373 drop(last_adjustment);
374
375 let history = self.performance_history.read().await;
376 if history.len() < 10 {
377 return Ok(()); }
379
380 let avg_latency: f64 =
382 history.iter().map(|s| s.latency_ms).sum::<f64>() / history.len() as f64;
383 let avg_success_rate: f64 =
384 history.iter().map(|s| s.success_rate).sum::<f64>() / history.len() as f64;
385
386 let mut config = self.config.write().await;
387 let current_rate = config.max_requests_per_second;
388
389 let adjustment_factor =
391 if avg_latency > config.target_latency_ms * 1.5 || avg_success_rate < 0.95 {
392 0.9
394 } else if avg_latency < config.target_latency_ms * 0.5 && avg_success_rate > 0.98 {
395 1.1
397 } else {
398 1.0
400 };
401
402 if adjustment_factor != 1.0 {
403 let new_rate = ((current_rate as f64) * adjustment_factor) as usize;
404 let max_rate = ((current_rate as f64) * config.max_adjustment_factor) as usize;
405 let min_rate = ((current_rate as f64) / config.max_adjustment_factor) as usize;
406
407 config.max_requests_per_second = new_rate.clamp(min_rate, max_rate);
408
409 info!(
410 "Adjusted rate limit from {} to {} req/s (factor: {:.2}, avg_latency: {:.2}ms, success_rate: {:.2}%)",
411 current_rate, config.max_requests_per_second, adjustment_factor, avg_latency, avg_success_rate * 100.0
412 );
413
414 }
418
419 Ok(())
420 }
421
422 pub async fn get_config(&self) -> RateLimitConfig {
424 self.config.read().await.clone()
425 }
426}
427
428pub struct ParallelStreamProcessor {
430 config: PerformanceUtilsConfig,
431 worker_semaphore: Arc<Semaphore>,
432 processing_stats: Arc<RwLock<ProcessingStats>>,
433}
434
435#[derive(Debug, Clone, Default)]
437pub struct ProcessingStats {
438 pub events_processed: u64,
439 pub avg_processing_time_ms: f64,
440 pub peak_concurrency: usize,
441 pub current_concurrency: usize,
442 pub throughput_events_per_sec: f64,
443 pub cpu_efficiency: f64,
444}
445
446impl ParallelStreamProcessor {
447 pub fn new(config: PerformanceUtilsConfig) -> Self {
449 let worker_semaphore = Arc::new(Semaphore::new(config.cpu_cores * 2));
450
451 Self {
452 config,
453 worker_semaphore,
454 processing_stats: Arc::new(RwLock::new(ProcessingStats::default())),
455 }
456 }
457
458 pub async fn process_parallel<F, Fut>(
460 &self,
461 events: Vec<StreamEvent>,
462 processor: F,
463 ) -> Result<Vec<Result<()>>>
464 where
465 F: Fn(StreamEvent) -> Fut + Send + Sync + Clone + 'static,
466 Fut: std::future::Future<Output = Result<()>> + Send + 'static,
467 {
468 let start_time = Instant::now();
469 let event_count = events.len();
470
471 {
473 let mut stats = self.processing_stats.write().await;
474 stats.current_concurrency = event_count.min(self.config.cpu_cores * 2);
475 stats.peak_concurrency = stats.peak_concurrency.max(stats.current_concurrency);
476 }
477
478 let mut handles = Vec::new();
480
481 for event in events {
482 let permit = self.worker_semaphore.clone().acquire_owned().await?;
483 let processor_clone = processor.clone();
484
485 let handle = tokio::spawn(async move {
486 let _permit = permit; let result = processor_clone(event).await;
488 yield_now().await; result
490 });
491
492 handles.push(handle);
493 }
494
495 let mut results = Vec::new();
497 for handle in handles {
498 match handle.await {
499 Ok(result) => results.push(result),
500 Err(e) => results.push(Err(anyhow!("Task join error: {}", e))),
501 }
502 }
503
504 let processing_time = start_time.elapsed();
506 {
507 let mut stats = self.processing_stats.write().await;
508 stats.events_processed += event_count as u64;
509 stats.avg_processing_time_ms =
510 (stats.avg_processing_time_ms + processing_time.as_millis() as f64) / 2.0;
511 stats.current_concurrency = 0;
512
513 if processing_time.as_secs_f64() > 0.0 {
514 stats.throughput_events_per_sec =
515 event_count as f64 / processing_time.as_secs_f64();
516 }
517
518 let ideal_time = (event_count as f64) / (self.config.cpu_cores as f64);
520 let actual_time = processing_time.as_secs_f64();
521 stats.cpu_efficiency = if actual_time > 0.0 {
522 (ideal_time / actual_time).min(1.0)
523 } else {
524 1.0
525 };
526 }
527
528 debug!(
529 "Processed {} events in {:?} ({:.2} events/sec)",
530 event_count,
531 processing_time,
532 event_count as f64 / processing_time.as_secs_f64()
533 );
534
535 Ok(results)
536 }
537
538 pub async fn get_stats(&self) -> ProcessingStats {
540 self.processing_stats.read().await.clone()
541 }
542}
543
544pub struct IntelligentPrefetcher<T> {
546 cache: Arc<RwLock<HashMap<String, T>>>,
547 access_patterns: Arc<RwLock<HashMap<String, AccessPattern>>>,
548 max_cache_size: usize,
549}
550
551#[derive(Debug, Clone)]
553struct AccessPattern {
554 access_count: u64,
555 last_access: Instant,
556 prediction_score: f64,
557 related_keys: Vec<String>,
558}
559
560impl<T: Clone> IntelligentPrefetcher<T> {
561 pub fn new(max_cache_size: usize) -> Self {
563 Self {
564 cache: Arc::new(RwLock::new(HashMap::new())),
565 access_patterns: Arc::new(RwLock::new(HashMap::new())),
566 max_cache_size,
567 }
568 }
569
570 pub async fn get_with_prefetch<F, Fut>(&self, key: &str, loader: F) -> Result<T>
572 where
573 F: FnOnce(String) -> Fut + Send,
574 Fut: std::future::Future<Output = Result<T>> + Send,
575 T: Send + Sync,
576 {
577 {
579 let cache = self.cache.read().await;
580 if let Some(data) = cache.get(key) {
581 self.update_access_pattern(key).await;
582 return Ok(data.clone());
583 }
584 }
585
586 let data = loader(key.to_string()).await?;
588
589 {
591 let mut cache = self.cache.write().await;
592 if cache.len() >= self.max_cache_size {
593 self.evict_lru().await;
595 }
596 cache.insert(key.to_string(), data.clone());
597 }
598
599 self.update_access_pattern(key).await;
601 self.trigger_intelligent_prefetch(key).await;
602
603 Ok(data)
604 }
605
606 async fn update_access_pattern(&self, key: &str) {
608 let mut patterns = self.access_patterns.write().await;
609 let pattern = patterns
610 .entry(key.to_string())
611 .or_insert_with(|| AccessPattern {
612 access_count: 0,
613 last_access: Instant::now(),
614 prediction_score: 0.0,
615 related_keys: Vec::new(),
616 });
617
618 pattern.access_count += 1;
619 pattern.last_access = Instant::now();
620
621 let recency_factor = 1.0; let frequency_factor = (pattern.access_count as f64).ln();
624 pattern.prediction_score = recency_factor * frequency_factor;
625 }
626
627 async fn trigger_intelligent_prefetch(&self, accessed_key: &str) {
629 let patterns = self.access_patterns.read().await;
630
631 if let Some(pattern) = patterns.get(accessed_key) {
632 for related_key in &pattern.related_keys {
634 if let Some(related_pattern) = patterns.get(related_key) {
635 if related_pattern.prediction_score > 0.5 {
636 debug!("Would prefetch related key: {}", related_key);
638 }
639 }
640 }
641 }
642 }
643
644 async fn evict_lru(&self) {
646 let patterns = self.access_patterns.read().await;
647
648 if let Some((lru_key, _)) = patterns
649 .iter()
650 .min_by_key(|(_, pattern)| pattern.last_access)
651 {
652 let lru_key = lru_key.clone();
653 drop(patterns);
654
655 let mut cache = self.cache.write().await;
656 cache.remove(&lru_key);
657
658 let mut patterns = self.access_patterns.write().await;
659 patterns.remove(&lru_key);
660 }
661 }
662
663 pub async fn get_cache_stats(&self) -> (usize, usize) {
665 let cache_size = self.cache.read().await.len();
666 let pattern_count = self.access_patterns.read().await.len();
667 (cache_size, pattern_count)
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674 use tokio::time::{sleep, Duration};
675
676 #[tokio::test]
677 async fn test_adaptive_batcher() {
678 let config = PerformanceUtilsConfig::default();
679 let mut batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
680
681 let event = crate::event::StreamEvent::TripleAdded {
682 subject: "http://example.org/subject".to_string(),
683 predicate: "http://example.org/predicate".to_string(),
684 object: "\"test_object\"".to_string(),
685 graph: None,
686 metadata: crate::event::EventMetadata::default(),
687 };
688
689 for i in 0..10 {
691 let result = batcher.add_event(event.clone()).await.unwrap();
692 if i == 9 {
693 assert!(result.is_none());
695 }
696 }
697
698 let batch = batcher.flush().await.unwrap();
700 assert!(batch.is_some());
701 assert_eq!(batch.unwrap().len(), 10);
702
703 let stats = batcher.get_stats().await;
704 assert_eq!(stats.total_batches, 1);
705 assert_eq!(stats.total_events, 10);
706 }
707
708 #[tokio::test]
709 async fn test_memory_pool() {
710 let pool: IntelligentMemoryPool<String> = IntelligentMemoryPool::new(10);
711
712 let obj1 = pool
714 .get_or_create("test_pool", || "test_string".to_string())
715 .await;
716 assert_eq!(obj1, "test_string");
717
718 pool.return_to_pool("test_pool", obj1).await;
720
721 let obj2 = pool
723 .get_or_create("test_pool", || "new_string".to_string())
724 .await;
725 assert_eq!(obj2, "test_string"); let stats = pool.get_stats().await;
728 assert_eq!(stats.pool_hits, 1);
729 assert_eq!(stats.total_allocations, 2);
730 }
731
732 #[tokio::test]
733 async fn test_adaptive_rate_limiter() {
734 let config = RateLimitConfig {
735 max_requests_per_second: 10,
736 burst_capacity: 5,
737 adjustment_interval: Duration::from_millis(100),
738 target_latency_ms: 10.0,
739 max_adjustment_factor: 2.0,
740 };
741
742 let limiter = AdaptiveRateLimiter::new(config);
743
744 for _ in 0..5 {
746 let _permit = limiter.acquire_permit().await.unwrap();
747 limiter.record_performance(5.0, true).await.unwrap(); }
749
750 sleep(Duration::from_millis(150)).await; let final_config = limiter.get_config().await;
753 assert!(final_config.max_requests_per_second >= 10);
755 }
756
757 #[tokio::test]
758 async fn test_parallel_processor() {
759 let config = PerformanceUtilsConfig::default();
760 let processor = ParallelStreamProcessor::new(config);
761
762 let events = vec![
763 crate::event::StreamEvent::TripleAdded {
764 subject: "http://example.org/subject1".to_string(),
765 predicate: "http://example.org/predicate".to_string(),
766 object: "\"test_object1\"".to_string(),
767 graph: None,
768 metadata: crate::event::EventMetadata::default(),
769 },
770 crate::event::StreamEvent::TripleAdded {
771 subject: "http://example.org/subject2".to_string(),
772 predicate: "http://example.org/predicate".to_string(),
773 object: "\"test_object2\"".to_string(),
774 graph: None,
775 metadata: crate::event::EventMetadata::default(),
776 },
777 ];
778
779 let results = processor
780 .process_parallel(events, |_event| async {
781 sleep(Duration::from_millis(10)).await;
782 Ok(())
783 })
784 .await
785 .unwrap();
786
787 assert_eq!(results.len(), 2);
788 assert!(results.iter().all(|r| r.is_ok()));
789
790 let stats = processor.get_stats().await;
791 assert_eq!(stats.events_processed, 2);
792 }
793
794 #[tokio::test]
795 async fn test_intelligent_prefetcher() {
796 let prefetcher: IntelligentPrefetcher<String> = IntelligentPrefetcher::new(10);
797
798 let data1 = prefetcher
800 .get_with_prefetch("key1", |key| async move { Ok(format!("data_for_{key}")) })
801 .await
802 .unwrap();
803
804 assert_eq!(data1, "data_for_key1");
805
806 let data2 = prefetcher
808 .get_with_prefetch("key1", |_key| async move {
809 Ok("should_not_be_called".to_string())
810 })
811 .await
812 .unwrap();
813
814 assert_eq!(data2, "data_for_key1");
815
816 let (cache_size, pattern_count) = prefetcher.get_cache_stats().await;
817 assert_eq!(cache_size, 1);
818 assert_eq!(pattern_count, 1);
819 }
820}