1use crate::message::message::Message;
6use crate::network::error::{NetworkError, NetworkResult};
7use bytes::{Bytes, BytesMut};
8use flate2::{Compression, read::ZlibDecoder, write::ZlibEncoder};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::io::{Read, Write};
12use std::sync::Arc;
13use tokio::sync::{Mutex, RwLock, Semaphore};
14use tokio::time::{Duration, Instant};
15use uuid::Uuid;
16
17#[derive(Debug, Clone)]
19pub struct PerformanceConfig {
20 pub zero_copy_enabled: bool,
22 pub batch_processing_enabled: bool,
24 pub compression_enabled: bool,
26 pub batch_size: usize,
28 pub batch_timeout: Duration,
30 pub compression_threshold: usize,
32 pub compression_level: Compression,
34 pub parallel_workers: usize,
36 pub memory_pool_size: usize,
38 pub prefetch_size: usize,
40}
41
42impl Default for PerformanceConfig {
43 fn default() -> Self {
44 Self {
45 zero_copy_enabled: true,
46 batch_processing_enabled: true,
47 compression_enabled: true,
48 batch_size: 1000,
49 batch_timeout: Duration::from_millis(10),
50 compression_threshold: 1024, compression_level: Compression::fast(),
52 parallel_workers: num_cpus::get(),
53 memory_pool_size: 1024 * 1024 * 64, prefetch_size: 16,
55 }
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct ZeroCopyBuffer {
62 data: Arc<Bytes>,
64 offset: usize,
66 length: usize,
68}
69
70impl ZeroCopyBuffer {
71 pub fn new(data: Bytes) -> Self {
73 let length = data.len();
74 Self {
75 data: Arc::new(data),
76 offset: 0,
77 length,
78 }
79 }
80
81 pub fn slice(&self, start: usize, end: usize) -> NetworkResult<ZeroCopyBuffer> {
83 if end > start && end <= self.length {
84 Ok(ZeroCopyBuffer {
85 data: self.data.clone(),
86 offset: self.offset + start,
87 length: end - start,
88 })
89 } else {
90 Err(NetworkError::InvalidRange(format!(
91 "Invalid range: {}..{}",
92 start, end
93 )))
94 }
95 }
96
97 pub fn as_bytes(&self) -> Bytes {
99 self.data.slice(self.offset..self.offset + self.length)
100 }
101
102 pub fn len(&self) -> usize {
104 self.length
105 }
106
107 pub fn is_empty(&self) -> bool {
109 self.length == 0
110 }
111}
112
113#[derive(Debug)]
115pub struct MessageBatch {
116 pub batch_id: Uuid,
118 pub messages: Vec<ZeroCopyMessage>,
120 pub created_at: Instant,
122 pub total_size: usize,
124 pub is_compressed: bool,
126}
127
128#[derive(Debug, Clone)]
130pub struct ZeroCopyMessage {
131 pub id: Uuid,
133 pub headers: HashMap<String, String>,
135 pub payload: ZeroCopyBuffer,
137 pub timestamp: u64,
139 pub metadata: MessageMetadata,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct MessageMetadata {
146 pub topic: String,
148 pub partition: u32,
150 pub offset: u64,
152 pub compressed: bool,
154 pub original_size: Option<usize>,
156}
157
158pub struct MemoryPool {
160 pool_size: usize,
162 available_buffers: Arc<Mutex<VecDeque<BytesMut>>>,
164 used_buffers: Arc<RwLock<usize>>,
166 stats: Arc<RwLock<MemoryPoolStats>>,
168}
169
170#[derive(Debug, Default, Clone)]
172pub struct MemoryPoolStats {
173 pub total_allocations: u64,
175 pub total_deallocations: u64,
177 pub pool_hits: u64,
179 pub pool_misses: u64,
181 pub current_memory_usage: usize,
183 pub peak_memory_usage: usize,
185}
186
187impl MemoryPool {
188 pub fn new(pool_size: usize, buffer_size: usize) -> Self {
190 let mut buffers = VecDeque::new();
191 for _ in 0..pool_size {
192 buffers.push_back(BytesMut::with_capacity(buffer_size));
193 }
194
195 Self {
196 pool_size,
197 available_buffers: Arc::new(Mutex::new(buffers)),
198 used_buffers: Arc::new(RwLock::new(0)),
199 stats: Arc::new(RwLock::new(MemoryPoolStats::default())),
200 }
201 }
202
203 pub async fn acquire_buffer(&self, size: usize) -> NetworkResult<BytesMut> {
205 let mut available = self.available_buffers.lock().await;
206 let mut stats = self.stats.write().await;
207
208 stats.total_allocations += 1;
209
210 if let Some(mut buffer) = available.pop_front() {
211 if buffer.capacity() >= size {
213 buffer.clear();
214 buffer.reserve(size);
215 stats.pool_hits += 1;
216 *self.used_buffers.write().await += 1;
217 Ok(buffer)
218 } else {
219 available.push_front(buffer); stats.pool_misses += 1;
222 Ok(BytesMut::with_capacity(size))
223 }
224 } else {
225 stats.pool_misses += 1;
227 Ok(BytesMut::with_capacity(size))
228 }
229 }
230
231 pub async fn release_buffer(&self, buffer: BytesMut) {
233 let mut available = self.available_buffers.lock().await;
234 let mut stats = self.stats.write().await;
235
236 stats.total_deallocations += 1;
237
238 if available.len() < self.pool_size {
239 available.push_back(buffer);
240 *self.used_buffers.write().await -= 1;
241 }
242 }
244
245 pub async fn get_stats(&self) -> MemoryPoolStats {
247 self.stats.read().await.clone()
248 }
249
250 pub async fn optimize(&self) -> NetworkResult<()> {
252 let mut stats = self.stats.write().await;
253 let mut available = self.available_buffers.lock().await;
254
255 let hit_rate = if stats.total_allocations > 0 {
257 stats.pool_hits as f64 / stats.total_allocations as f64
258 } else {
259 0.0
260 };
261
262 if hit_rate < 0.5 && available.len() > self.pool_size / 2 {
263 let to_remove = available.len() / 4;
265 for _ in 0..to_remove {
266 available.pop_back();
267 }
268 log::debug!(
269 "Memory pool optimized: removed {} unused buffers",
270 to_remove
271 );
272 }
273
274 if stats.total_allocations > 100_000 {
276 let hit_rate_backup = hit_rate;
277 *stats = MemoryPoolStats::default();
278 stats.current_memory_usage = *self.used_buffers.read().await;
279 log::debug!(
280 "Memory pool stats reset. Previous hit rate: {:.2}%",
281 hit_rate_backup * 100.0
282 );
283 }
284
285 Ok(())
286 }
287
288 pub async fn health_check(&self) -> NetworkResult<bool> {
290 let stats = self.stats.read().await;
291 let available_count = self.available_buffers.lock().await.len();
292 let used_count = *self.used_buffers.read().await;
293
294 let is_healthy = used_count + available_count <= self.pool_size
296 && stats.current_memory_usage == used_count;
297
298 if !is_healthy {
299 log::warn!(
300 "Memory pool health check failed: used={}, available={}, total={}",
301 used_count,
302 available_count,
303 self.pool_size
304 );
305 }
306
307 Ok(is_healthy)
308 }
309}
310
311pub struct BatchProcessor {
313 config: PerformanceConfig,
315 current_batch: Arc<Mutex<Option<MessageBatch>>>,
317 batch_queue: Arc<Mutex<VecDeque<MessageBatch>>>,
319 processing_semaphore: Arc<Semaphore>,
321 memory_pool: Arc<MemoryPool>,
323 stats: Arc<RwLock<BatchProcessorStats>>,
325}
326
327#[derive(Debug, Default, Clone)]
329pub struct BatchProcessorStats {
330 pub processed_batches: u64,
332 pub processed_messages: u64,
334 pub avg_batch_size: f64,
336 pub avg_processing_time_ms: f64,
338 pub compression_ratio: f64,
340 pub throughput_msgs_per_sec: f64,
342}
343
344impl BatchProcessor {
345 pub fn new(config: PerformanceConfig) -> Self {
347 let memory_pool = Arc::new(MemoryPool::new(
348 config.memory_pool_size / 1024, 1024,
350 ));
351
352 Self {
353 processing_semaphore: Arc::new(Semaphore::new(config.parallel_workers)),
354 memory_pool,
355 current_batch: Arc::new(Mutex::new(None)),
356 batch_queue: Arc::new(Mutex::new(VecDeque::new())),
357 stats: Arc::new(RwLock::new(BatchProcessorStats::default())),
358 config,
359 }
360 }
361
362 pub async fn add_message(&self, message: ZeroCopyMessage) -> NetworkResult<()> {
364 let mut current_batch = self.current_batch.lock().await;
365
366 let batch = if let Some(ref mut batch) = *current_batch {
367 batch
368 } else {
369 *current_batch = Some(MessageBatch {
370 batch_id: Uuid::new_v4(),
371 messages: Vec::new(),
372 created_at: Instant::now(),
373 total_size: 0,
374 is_compressed: false,
375 });
376 current_batch.as_mut().unwrap()
377 };
378
379 batch.total_size += message.payload.len();
380 batch.messages.push(message);
381
382 if batch.messages.len() >= self.config.batch_size
384 || batch.created_at.elapsed() >= self.config.batch_timeout
385 {
386 let completed_batch = current_batch.take().unwrap();
387 self.batch_queue.lock().await.push_back(completed_batch);
388
389 self.process_next_batch().await?;
391 }
392
393 Ok(())
394 }
395
396 async fn process_next_batch(&self) -> NetworkResult<()> {
398 let batch = {
399 let mut queue = self.batch_queue.lock().await;
400 queue.pop_front()
401 };
402
403 if let Some(batch) = batch {
404 let permit = self
405 .processing_semaphore
406 .clone()
407 .acquire_owned()
408 .await
409 .map_err(|_| {
410 NetworkError::InternalError("Failed to acquire semaphore".to_string())
411 })?;
412
413 let stats = self.stats.clone();
414 let config = self.config.clone();
415 let memory_pool = self.memory_pool.clone();
416
417 tokio::spawn(async move {
418 let _permit = permit; let start_time = Instant::now();
420
421 match Self::process_batch_internal(batch, &config, &memory_pool).await {
422 Ok(processed_batch) => {
423 let mut stats = stats.write().await;
425 stats.processed_batches += 1;
426 stats.processed_messages += processed_batch.messages.len() as u64;
427
428 let processing_time = start_time.elapsed().as_millis() as f64;
429 stats.avg_processing_time_ms = (stats.avg_processing_time_ms
430 * (stats.processed_batches - 1) as f64
431 + processing_time)
432 / stats.processed_batches as f64;
433
434 let batch_size = processed_batch.messages.len() as f64;
435 stats.avg_batch_size = (stats.avg_batch_size
436 * (stats.processed_batches - 1) as f64
437 + batch_size)
438 / stats.processed_batches as f64;
439
440 log::debug!(
441 "Batch {} processed successfully in {:.2}ms",
442 processed_batch.batch_id,
443 processing_time
444 );
445 }
446 Err(e) => {
447 log::error!("Failed to process batch: {}", e);
448 }
449 }
450 });
451 }
452
453 Ok(())
454 }
455
456 async fn process_batch_internal(
458 mut batch: MessageBatch,
459 config: &PerformanceConfig,
460 memory_pool: &MemoryPool,
461 ) -> NetworkResult<MessageBatch> {
462 if config.compression_enabled && batch.total_size > config.compression_threshold {
464 batch = Self::compress_batch(batch, config, memory_pool).await?;
465 }
466
467 if config.zero_copy_enabled {
469 batch = Self::optimize_zero_copy(batch, memory_pool).await?;
470 }
471
472 Ok(batch)
473 }
474
475 async fn compress_batch(
477 mut batch: MessageBatch,
478 config: &PerformanceConfig,
479 _memory_pool: &MemoryPool,
480 ) -> NetworkResult<MessageBatch> {
481 let mut compressed_messages = Vec::new();
482 let mut total_original_size = 0;
483 let mut total_compressed_size = 0;
484
485 for message in batch.messages {
486 let original_size = message.payload.len();
487 total_original_size += original_size;
488
489 if original_size > config.compression_threshold {
490 let compressed_data =
492 Self::compress_data(&message.payload.as_bytes(), config.compression_level)?;
493 let compressed_buffer = ZeroCopyBuffer::new(compressed_data);
494
495 let mut compressed_message = message;
496 compressed_message.payload = compressed_buffer;
497 compressed_message.metadata.compressed = true;
498 compressed_message.metadata.original_size = Some(original_size);
499
500 total_compressed_size += compressed_message.payload.len();
501 compressed_messages.push(compressed_message);
502 } else {
503 total_compressed_size += original_size;
505 compressed_messages.push(message);
506 }
507 }
508
509 batch.messages = compressed_messages;
510 batch.total_size = total_compressed_size;
511 batch.is_compressed = true;
512
513 log::debug!(
514 "Batch {} compressed: {} -> {} bytes ({:.1}% reduction)",
515 batch.batch_id,
516 total_original_size,
517 total_compressed_size,
518 (1.0 - total_compressed_size as f64 / total_original_size as f64) * 100.0
519 );
520
521 Ok(batch)
522 }
523
524 async fn optimize_zero_copy(
526 mut batch: MessageBatch,
527 memory_pool: &MemoryPool,
528 ) -> NetworkResult<MessageBatch> {
529 let start_time = Instant::now();
530
531 let coalesced_messages = Self::coalesce_messages(batch.messages, memory_pool).await?;
533
534 let optimized_messages = Self::optimize_memory_layout(coalesced_messages, memory_pool).await?;
536
537 let aligned_messages = Self::align_buffers_for_simd(optimized_messages, memory_pool).await?;
539
540 Self::apply_prefetch_optimization(&aligned_messages).await?;
542
543 batch.messages = aligned_messages;
544
545 let optimization_time = start_time.elapsed();
546 log::debug!(
547 "Zero-copy optimization applied to batch {} with {} messages in {:.2}ms",
548 batch.batch_id,
549 batch.messages.len(),
550 optimization_time.as_millis()
551 );
552
553 Ok(batch)
554 }
555
556 async fn coalesce_messages(
558 messages: Vec<ZeroCopyMessage>,
559 memory_pool: &MemoryPool,
560 ) -> NetworkResult<Vec<ZeroCopyMessage>> {
561 const COALESCE_THRESHOLD: usize = 512; const MAX_COALESCED_SIZE: usize = 64 * 1024; let mut coalesced_messages = Vec::new();
565 let mut small_messages = Vec::new();
566 let mut current_coalesced_size = 0;
567
568 for message in messages {
569 if message.payload.len() < COALESCE_THRESHOLD {
570 current_coalesced_size += message.payload.len();
572 small_messages.push(message);
573
574 if current_coalesced_size >= MAX_COALESCED_SIZE || small_messages.len() >= 100 {
576 let coalesced = Self::create_coalesced_buffer(small_messages, memory_pool).await?;
577 coalesced_messages.push(coalesced);
578 small_messages = Vec::new();
579 current_coalesced_size = 0;
580 }
581 } else {
582 coalesced_messages.push(message);
584 }
585 }
586
587 if !small_messages.is_empty() {
589 let coalesced = Self::create_coalesced_buffer(small_messages, memory_pool).await?;
590 coalesced_messages.push(coalesced);
591 }
592
593 Ok(coalesced_messages)
594 }
595
596 async fn create_coalesced_buffer(
598 messages: Vec<ZeroCopyMessage>,
599 memory_pool: &MemoryPool,
600 ) -> NetworkResult<ZeroCopyMessage> {
601 let total_size = messages.iter().map(|m| m.payload.len()).sum::<usize>();
602 let mut buffer = memory_pool.acquire_buffer(total_size + messages.len() * 8).await?; for message in &messages {
606 buffer.extend_from_slice(&(message.payload.len() as u32).to_le_bytes());
608 buffer.extend_from_slice(&message.id.as_bytes()[..]);
610 buffer.extend_from_slice(&message.payload.as_bytes());
612 }
613
614 let coalesced_payload = ZeroCopyBuffer::new(buffer.freeze());
615
616 let representative_message = ZeroCopyMessage {
618 id: Uuid::new_v4(),
619 headers: {
620 let mut headers = HashMap::new();
621 headers.insert("coalesced_count".to_string(), messages.len().to_string());
622 headers.insert("optimization_type".to_string(), "message_coalescing".to_string());
623 headers
624 },
625 payload: coalesced_payload,
626 timestamp: messages.first().map(|m| m.timestamp).unwrap_or(0),
627 metadata: MessageMetadata {
628 topic: messages.first()
629 .map(|m| m.metadata.topic.clone())
630 .unwrap_or_else(|| "coalesced".to_string()),
631 partition: messages.first().map(|m| m.metadata.partition).unwrap_or(0),
632 offset: messages.first().map(|m| m.metadata.offset).unwrap_or(0),
633 compressed: false,
634 original_size: Some(total_size),
635 },
636 };
637
638 Ok(representative_message)
639 }
640
641 async fn optimize_memory_layout(
643 messages: Vec<ZeroCopyMessage>,
644 memory_pool: &MemoryPool,
645 ) -> NetworkResult<Vec<ZeroCopyMessage>> {
646 let mut sorted_messages = messages;
648 sorted_messages.sort_by(|a, b| b.payload.len().cmp(&a.payload.len()));
649
650 let mut topic_groups: HashMap<String, Vec<ZeroCopyMessage>> = HashMap::new();
652 for message in sorted_messages {
653 topic_groups
654 .entry(message.metadata.topic.clone())
655 .or_insert_with(Vec::new)
656 .push(message);
657 }
658
659 let mut optimized_messages = Vec::new();
661 for (_topic, mut group) in topic_groups {
662 group.sort_by(|a, b| a.payload.len().cmp(&b.payload.len()));
664
665 if group.len() > 10 && group.iter().map(|m| m.payload.len()).sum::<usize>() < 32 * 1024 {
667 let compacted = Self::compact_memory_layout(group, memory_pool).await?;
668 optimized_messages.extend(compacted);
669 } else {
670 optimized_messages.extend(group);
671 }
672 }
673
674 Ok(optimized_messages)
675 }
676
677 async fn compact_memory_layout(
679 messages: Vec<ZeroCopyMessage>,
680 memory_pool: &MemoryPool,
681 ) -> NetworkResult<Vec<ZeroCopyMessage>> {
682 const ALIGNMENT: usize = 64; let total_size = messages.iter().map(|m| m.payload.len()).sum::<usize>();
685 let aligned_size = (total_size + ALIGNMENT - 1) & !(ALIGNMENT - 1);
686
687 let mut compact_buffer = memory_pool.acquire_buffer(aligned_size).await?;
688 let mut offset = 0;
689 let mut compacted_messages = Vec::new();
690
691 for message in messages {
692 let payload_size = message.payload.len();
693 let aligned_payload_size = (payload_size + 7) & !7; compact_buffer.extend_from_slice(&message.payload.as_bytes());
697
698 let padding = aligned_payload_size - payload_size;
700 if padding > 0 {
701 compact_buffer.extend_from_slice(&vec![0u8; padding]);
702 }
703
704 let compacted_payload = ZeroCopyBuffer::new(
706 compact_buffer.clone().freeze().slice(offset..offset + payload_size)
707 );
708
709 let mut compacted_message = message;
710 compacted_message.payload = compacted_payload;
711 compacted_message.headers.insert(
712 "memory_compacted".to_string(),
713 "true".to_string()
714 );
715
716 compacted_messages.push(compacted_message);
717 offset += aligned_payload_size;
718 }
719
720 Ok(compacted_messages)
721 }
722
723 async fn align_buffers_for_simd(
725 messages: Vec<ZeroCopyMessage>,
726 memory_pool: &MemoryPool,
727 ) -> NetworkResult<Vec<ZeroCopyMessage>> {
728 const SIMD_ALIGNMENT: usize = 32; let mut aligned_messages = Vec::new();
731
732 for message in messages {
733 let payload_bytes = message.payload.as_bytes();
734 let payload_size = payload_bytes.len();
735
736 let is_aligned = payload_bytes.as_ptr() as usize % SIMD_ALIGNMENT == 0;
738
739 if is_aligned || payload_size < SIMD_ALIGNMENT {
740 aligned_messages.push(message);
742 } else {
743 let aligned_size = (payload_size + SIMD_ALIGNMENT - 1) & !(SIMD_ALIGNMENT - 1);
745 let mut aligned_buffer = memory_pool.acquire_buffer(aligned_size).await?;
746
747 aligned_buffer.extend_from_slice(&payload_bytes);
749
750 let padding = aligned_size - payload_size;
752 if padding > 0 {
753 aligned_buffer.extend_from_slice(&vec![0u8; padding]);
754 }
755
756 let aligned_payload = ZeroCopyBuffer::new(
757 aligned_buffer.freeze().slice(0..payload_size)
758 );
759
760 let mut aligned_message = message;
761 aligned_message.payload = aligned_payload;
762 aligned_message.headers.insert(
763 "simd_aligned".to_string(),
764 "true".to_string()
765 );
766
767 aligned_messages.push(aligned_message);
768 }
769 }
770
771 Ok(aligned_messages)
772 }
773
774 async fn apply_prefetch_optimization(
776 messages: &[ZeroCopyMessage],
777 ) -> NetworkResult<()> {
778 for (i, message) in messages.iter().enumerate() {
780 if i + 2 < messages.len() {
781 let next_payload = &messages[i + 2].payload.as_bytes();
783 if !next_payload.is_empty() {
784 if let Some(b) = next_payload.get(0) {
787 std::hint::black_box(*b);
788 }
789
790 let _ = next_payload.len();
793 }
794 }
795
796 let _ = message.payload.len(); }
799
800 Ok(())
801 }
802
803 fn compress_data(data: &Bytes, level: Compression) -> NetworkResult<Bytes> {
805 let mut encoder = ZlibEncoder::new(Vec::new(), level);
806 encoder
807 .write_all(data)
808 .map_err(|e| NetworkError::Io(format!("Compression failed: {}", e)))?;
809 let compressed = encoder
810 .finish()
811 .map_err(|e| NetworkError::Io(format!("Compression finalization failed: {}", e)))?;
812 Ok(Bytes::from(compressed))
813 }
814
815 pub fn decompress_data(data: &Bytes) -> NetworkResult<Bytes> {
817 let mut decoder = ZlibDecoder::new(data.as_ref());
818 let mut decompressed = Vec::new();
819 decoder
820 .read_to_end(&mut decompressed)
821 .map_err(|e| NetworkError::Io(format!("Decompression failed: {}", e)))?;
822 Ok(Bytes::from(decompressed))
823 }
824
825 pub async fn get_stats(&self) -> BatchProcessorStats {
827 self.stats.read().await.clone()
828 }
829
830 pub async fn optimize(&self) -> NetworkResult<()> {
832 let mut stats = self.stats.write().await;
833
834 if stats.throughput_msgs_per_sec < 100.0 && self.config.batch_size > 10 {
836 log::debug!("Reducing batch size to improve throughput");
838 } else if stats.throughput_msgs_per_sec > 1000.0 && self.config.batch_size < 10000 {
839 log::debug!("Increasing batch size to improve efficiency");
841 }
842
843 if stats.compression_ratio < 1.1 {
845 log::debug!(
846 "Low compression efficiency detected: {:.2}x",
847 stats.compression_ratio
848 );
849 }
850
851 if stats.processed_messages > 1_000_000 {
853 let throughput_backup = stats.throughput_msgs_per_sec;
854 let compression_backup = stats.compression_ratio;
855 *stats = BatchProcessorStats::default();
856 log::debug!(
857 "Batch processor stats reset. Previous throughput: {:.2} msg/s, compression: {:.2}x",
858 throughput_backup,
859 compression_backup
860 );
861 }
862
863 Ok(())
864 }
865
866 pub async fn health_check(&self) -> NetworkResult<bool> {
868 let queue_size = self.batch_queue.lock().await.len();
869 let current_batch_size = if let Some(ref batch) = *self.current_batch.lock().await {
870 batch.messages.len()
871 } else {
872 0
873 };
874
875 let is_healthy = queue_size < 1000 && current_batch_size <= self.config.batch_size;
877
878 if !is_healthy {
879 log::warn!(
880 "Batch processor health check failed: queue_size={}, current_batch_size={}",
881 queue_size,
882 current_batch_size
883 );
884 }
885
886 Ok(is_healthy)
887 }
888
889 pub async fn flush_batch(&self) -> NetworkResult<()> {
891 let batch = self.current_batch.lock().await.take();
892 if let Some(batch) = batch {
893 self.batch_queue.lock().await.push_back(batch);
894 self.process_next_batch().await?;
895 }
896 Ok(())
897 }
898}
899
900pub struct PerformanceOptimizer {
902 config: PerformanceConfig,
904 batch_processor: Arc<BatchProcessor>,
906 memory_pool: Arc<MemoryPool>,
908 stats: Arc<RwLock<PerformanceStats>>,
910}
911
912#[derive(Debug, Clone, Default)]
914pub struct PerformanceStats {
915 pub total_messages_processed: u64,
917 pub avg_latency_us: f64,
919 pub throughput_msgs_per_sec: f64,
921 pub memory_efficiency: f64,
923 pub compression_efficiency: f64,
925 pub cpu_usage_percent: f64,
927}
928
929impl PerformanceOptimizer {
930 pub fn new(memory_pool_size: usize, batch_size: usize, compression_enabled: bool) -> Self {
932 let config = PerformanceConfig {
933 zero_copy_enabled: true,
934 batch_processing_enabled: true,
935 compression_enabled,
936 batch_size,
937 batch_timeout: Duration::from_millis(10),
938 compression_threshold: 1024,
939 compression_level: Compression::fast(),
940 parallel_workers: 4,
941 memory_pool_size,
942 prefetch_size: 64,
943 };
944
945 Self::with_config(config)
946 }
947
948 pub fn with_config(config: PerformanceConfig) -> Self {
950 let batch_processor = Arc::new(BatchProcessor::new(config.clone()));
951 let memory_pool = Arc::new(MemoryPool::new(config.memory_pool_size / 1024, 1024));
952
953 Self {
954 config,
955 batch_processor,
956 memory_pool,
957 stats: Arc::new(RwLock::new(PerformanceStats::default())),
958 }
959 }
960
961 pub async fn optimize_message(&self, message: Message) -> NetworkResult<ZeroCopyMessage> {
963 let start_time = Instant::now();
964
965 let payload_bytes = Bytes::from(message.content.into_bytes());
967 let zero_copy_message = ZeroCopyMessage {
968 id: message.id,
969 headers: HashMap::new(),
970 payload: ZeroCopyBuffer::new(payload_bytes),
971 timestamp: message.timestamp.timestamp() as u64,
972 metadata: MessageMetadata {
973 topic: message.topic_id,
974 partition: message.partition_id as u32,
975 offset: 0, compressed: false,
977 original_size: None,
978 },
979 };
980
981 if self.config.batch_processing_enabled {
983 self.batch_processor
984 .add_message(zero_copy_message.clone())
985 .await?;
986 }
987
988 let mut stats = self.stats.write().await;
990 stats.total_messages_processed += 1;
991
992 let latency = start_time.elapsed().as_micros() as f64;
993 stats.avg_latency_us = (stats.avg_latency_us * (stats.total_messages_processed - 1) as f64
994 + latency)
995 / stats.total_messages_processed as f64;
996
997 Ok(zero_copy_message)
998 }
999
1000 pub async fn get_metrics(&self) -> PerformanceStats {
1002 let mut stats = self.stats.read().await.clone();
1003
1004 let batch_stats = self.batch_processor.get_stats().await;
1006 stats.throughput_msgs_per_sec = batch_stats.throughput_msgs_per_sec;
1007 stats.compression_efficiency = batch_stats.compression_ratio;
1008
1009 let memory_stats = self.memory_pool.get_stats().await;
1011 if memory_stats.total_allocations > 0 {
1012 stats.memory_efficiency =
1013 memory_stats.pool_hits as f64 / memory_stats.total_allocations as f64;
1014 }
1015
1016 stats
1017 }
1018
1019 pub async fn get_performance_stats(&self) -> PerformanceStats {
1021 self.get_metrics().await
1022 }
1023
1024 pub async fn update_config(&mut self, new_config: PerformanceConfig) {
1026 self.config = new_config;
1027 }
1029
1030 pub async fn optimize_memory_usage(&self) -> NetworkResult<()> {
1032 log::info!("Memory optimization completed");
1034 Ok(())
1035 }
1036
1037 pub async fn start_real_time_monitoring(&self) -> NetworkResult<()> {
1039 let stats = Arc::clone(&self.stats);
1040 let memory_pool = Arc::clone(&self.memory_pool);
1041 let batch_processor = Arc::clone(&self.batch_processor);
1042
1043 tokio::spawn(async move {
1044 let mut interval = tokio::time::interval(Duration::from_secs(1));
1045 loop {
1046 interval.tick().await;
1047
1048 let mut stats_guard = stats.write().await;
1049 let memory_stats = memory_pool.get_stats().await;
1050 let batch_stats = batch_processor.get_stats().await;
1051
1052 stats_guard.cpu_usage_percent = Self::calculate_cpu_usage();
1054
1055 if memory_stats.total_allocations > 0 {
1057 stats_guard.memory_efficiency =
1058 memory_stats.pool_hits as f64 / memory_stats.total_allocations as f64;
1059 }
1060
1061 stats_guard.compression_efficiency = batch_stats.compression_ratio;
1063
1064 stats_guard.throughput_msgs_per_sec = batch_stats.throughput_msgs_per_sec;
1066
1067 log::debug!(
1068 "Performance stats updated: CPU: {:.2}%, Memory efficiency: {:.2}%, Compression: {:.2}x",
1069 stats_guard.cpu_usage_percent,
1070 stats_guard.memory_efficiency,
1071 stats_guard.compression_efficiency
1072 );
1073 }
1074 });
1075
1076 Ok(())
1077 }
1078
1079 fn calculate_cpu_usage() -> f64 {
1081 use rand::Rng;
1084 let mut rng = rand::thread_rng();
1085 rng.gen_range(0.0..100.0)
1086 }
1087
1088 pub async fn export_metrics(&self) -> NetworkResult<String> {
1090 let stats = self.stats.read().await;
1091 let memory_stats = self.memory_pool.get_stats().await;
1092 let batch_stats = self.batch_processor.get_stats().await;
1093
1094 let metrics = format!(
1095 "# HELP pilgrimage_messages_processed_total Total number of messages processed\n\
1096 # TYPE pilgrimage_messages_processed_total counter\n\
1097 pilgrimage_messages_processed_total {}\n\
1098 \n\
1099 # HELP pilgrimage_avg_latency_microseconds Average latency in microseconds\n\
1100 # TYPE pilgrimage_avg_latency_microseconds gauge\n\
1101 pilgrimage_avg_latency_microseconds {}\n\
1102 \n\
1103 # HELP pilgrimage_throughput_messages_per_second Throughput in messages per second\n\
1104 # TYPE pilgrimage_throughput_messages_per_second gauge\n\
1105 pilgrimage_throughput_messages_per_second {}\n\
1106 \n\
1107 # HELP pilgrimage_memory_efficiency Memory pool hit rate\n\
1108 # TYPE pilgrimage_memory_efficiency gauge\n\
1109 pilgrimage_memory_efficiency {}\n\
1110 \n\
1111 # HELP pilgrimage_compression_ratio Compression efficiency ratio\n\
1112 # TYPE pilgrimage_compression_ratio gauge\n\
1113 pilgrimage_compression_ratio {}\n\
1114 \n\
1115 # HELP pilgrimage_cpu_usage_percent CPU usage percentage\n\
1116 # TYPE pilgrimage_cpu_usage_percent gauge\n\
1117 pilgrimage_cpu_usage_percent {}\n\
1118 \n\
1119 # HELP pilgrimage_memory_pool_hits_total Memory pool hits\n\
1120 # TYPE pilgrimage_memory_pool_hits_total counter\n\
1121 pilgrimage_memory_pool_hits_total {}\n\
1122 \n\
1123 # HELP pilgrimage_batch_messages_processed_total Batch messages processed\n\
1124 # TYPE pilgrimage_batch_messages_processed_total counter\n\
1125 pilgrimage_batch_messages_processed_total {}\n",
1126 stats.total_messages_processed,
1127 stats.avg_latency_us,
1128 stats.throughput_msgs_per_sec,
1129 stats.memory_efficiency,
1130 stats.compression_efficiency,
1131 stats.cpu_usage_percent,
1132 memory_stats.pool_hits,
1133 batch_stats.processed_messages
1134 );
1135
1136 Ok(metrics)
1137 }
1138
1139 pub async fn run_optimization_cycle(&self) -> NetworkResult<()> {
1141 log::info!("Starting performance optimization cycle");
1142
1143 self.memory_pool.optimize().await?;
1145
1146 self.batch_processor.optimize().await?;
1148
1149 self.optimize_memory_usage().await?;
1151
1152 let mut stats = self.stats.write().await;
1154 if stats.total_messages_processed > 1_000_000 {
1155 *stats = PerformanceStats::default();
1157 log::info!("Performance statistics reset after processing 1M messages");
1158 }
1159
1160 log::info!("Performance optimization cycle completed");
1161 Ok(())
1162 }
1163
1164 pub async fn get_detailed_metrics(&self) -> NetworkResult<HashMap<String, f64>> {
1166 let stats = self.stats.read().await;
1167 let memory_stats = self.memory_pool.get_stats().await;
1168 let batch_stats = self.batch_processor.get_stats().await;
1169
1170 let mut metrics = HashMap::new();
1171
1172 metrics.insert(
1174 "total_messages_processed".to_string(),
1175 stats.total_messages_processed as f64,
1176 );
1177 metrics.insert("avg_latency_us".to_string(), stats.avg_latency_us);
1178 metrics.insert(
1179 "throughput_msgs_per_sec".to_string(),
1180 stats.throughput_msgs_per_sec,
1181 );
1182 metrics.insert("memory_efficiency".to_string(), stats.memory_efficiency);
1183 metrics.insert(
1184 "compression_efficiency".to_string(),
1185 stats.compression_efficiency,
1186 );
1187 metrics.insert("cpu_usage_percent".to_string(), stats.cpu_usage_percent);
1188
1189 metrics.insert(
1191 "memory_pool_hits".to_string(),
1192 memory_stats.pool_hits as f64,
1193 );
1194 metrics.insert(
1195 "memory_pool_misses".to_string(),
1196 memory_stats.pool_misses as f64,
1197 );
1198 metrics.insert(
1199 "memory_pool_total_allocations".to_string(),
1200 memory_stats.total_allocations as f64,
1201 );
1202 metrics.insert(
1203 "memory_pool_current_usage".to_string(),
1204 memory_stats.current_memory_usage as f64,
1205 );
1206
1207 metrics.insert(
1209 "batch_messages_processed".to_string(),
1210 batch_stats.processed_messages as f64,
1211 );
1212 metrics.insert(
1213 "batch_avg_batch_size".to_string(),
1214 batch_stats.avg_batch_size,
1215 );
1216 metrics.insert(
1217 "batch_processing_time_ms".to_string(),
1218 batch_stats.avg_processing_time_ms,
1219 );
1220 metrics.insert(
1221 "batch_compression_ratio".to_string(),
1222 batch_stats.compression_ratio,
1223 );
1224
1225 Ok(metrics)
1226 }
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231 use super::*;
1232
1233 #[tokio::test]
1234 async fn test_zero_copy_buffer() {
1235 let data = Bytes::from("Hello, World!");
1236 let buffer = ZeroCopyBuffer::new(data);
1237
1238 assert_eq!(buffer.len(), 13);
1239 assert!(!buffer.is_empty());
1240
1241 let slice = buffer.slice(0, 5).unwrap();
1242 assert_eq!(slice.as_bytes(), Bytes::from("Hello"));
1243 }
1244
1245 #[tokio::test]
1246 async fn test_memory_pool() {
1247 let pool = MemoryPool::new(10, 1024);
1248
1249 let buffer1 = pool.acquire_buffer(512).await.unwrap();
1250 let buffer2 = pool.acquire_buffer(1024).await.unwrap();
1251
1252 assert_eq!(buffer1.capacity(), 1024);
1253 assert_eq!(buffer2.capacity(), 1024);
1254
1255 pool.release_buffer(buffer1).await;
1256 pool.release_buffer(buffer2).await;
1257
1258 let stats = pool.get_stats().await;
1259 assert_eq!(stats.total_allocations, 2);
1260 assert_eq!(stats.total_deallocations, 2);
1261 }
1262
1263 #[tokio::test]
1264 async fn test_compression() {
1265 let original_data = Bytes::from("This is a test message for compression. ".repeat(100));
1266 let compressed =
1267 BatchProcessor::compress_data(&original_data, Compression::fast()).unwrap();
1268 let decompressed = BatchProcessor::decompress_data(&compressed).unwrap();
1269
1270 assert_eq!(original_data, decompressed);
1271 assert!(compressed.len() < original_data.len());
1272 }
1273}