#![allow(dead_code)]
use crate::protocol::{Message, PartitionId, TopicName};
use crate::Result;
use bytes::{Bytes, BytesMut};
use parking_lot::RwLock;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub struct BatchProcessor {
max_batch_size: usize,
max_batch_timeout_ms: u64,
batches: Arc<RwLock<HashMap<(TopicName, PartitionId), VecDeque<Message>>>>,
processed_batches: AtomicUsize,
total_messages: AtomicUsize,
batch_sizes: Arc<RwLock<Vec<usize>>>,
}
impl BatchProcessor {
pub fn new() -> Self {
Self {
max_batch_size: 10000, max_batch_timeout_ms: 10, batches: Arc::new(RwLock::new(HashMap::new())),
processed_batches: AtomicUsize::new(0),
total_messages: AtomicUsize::new(0),
batch_sizes: Arc::new(RwLock::new(Vec::with_capacity(10000))),
}
}
pub fn add_message(&self, topic: &str, partition: PartitionId, message: Message) {
let mut batches = self.batches.write();
let key = (topic.to_string(), partition);
let queue = batches.entry(key).or_insert_with(VecDeque::new);
queue.push_back(message);
if queue.len() >= self.max_batch_size {
self.total_messages
.fetch_add(queue.len(), Ordering::Relaxed);
}
}
pub fn extract_ready_batches(&self) -> Vec<(TopicName, PartitionId, Vec<Message>)> {
let mut batches = self.batches.write();
let mut ready_batches = Vec::new();
for ((topic, partition), queue) in batches.iter_mut() {
if !queue.is_empty() && queue.len() >= 100 {
let batch: Vec<Message> = queue.drain(..).collect();
let batch_size = batch.len();
ready_batches.push((topic.clone(), *partition, batch));
self.batch_sizes.write().push(batch_size);
self.processed_batches.fetch_add(1, Ordering::Relaxed);
}
}
ready_batches
}
pub fn get_stats(&self) -> BatchStats {
let batch_sizes = self.batch_sizes.read();
let avg_batch_size = if batch_sizes.is_empty() {
0.0
} else {
batch_sizes.iter().sum::<usize>() as f64 / batch_sizes.len() as f64
};
let max_batch_size = batch_sizes.iter().max().copied().unwrap_or(0);
let min_batch_size = batch_sizes.iter().min().copied().unwrap_or(0);
BatchStats {
processed_batches: self.processed_batches.load(Ordering::Relaxed),
total_messages: self.total_messages.load(Ordering::Relaxed),
avg_batch_size,
max_batch_size,
min_batch_size,
}
}
}
#[derive(Debug, Clone)]
pub struct BatchStats {
pub processed_batches: usize,
pub total_messages: usize,
pub avg_batch_size: f64,
pub max_batch_size: usize,
pub min_batch_size: usize,
}
pub struct ZeroCopyBufferManager {
small_buffers: Arc<RwLock<Vec<BytesMut>>>, medium_buffers: Arc<RwLock<Vec<BytesMut>>>, large_buffers: Arc<RwLock<Vec<BytesMut>>>,
allocations: AtomicUsize,
deallocations: AtomicUsize,
reuse_count: AtomicUsize,
}
impl ZeroCopyBufferManager {
pub fn new() -> Self {
let mut manager = Self {
small_buffers: Arc::new(RwLock::new(Vec::with_capacity(1000))),
medium_buffers: Arc::new(RwLock::new(Vec::with_capacity(500))),
large_buffers: Arc::new(RwLock::new(Vec::with_capacity(100))),
allocations: AtomicUsize::new(0),
deallocations: AtomicUsize::new(0),
reuse_count: AtomicUsize::new(0),
};
manager.pre_allocate_buffers();
manager
}
fn pre_allocate_buffers(&mut self) {
{
let mut small = self.small_buffers.write();
for _ in 0..500 {
small.push(BytesMut::with_capacity(1024));
}
}
{
let mut medium = self.medium_buffers.write();
for _ in 0..200 {
medium.push(BytesMut::with_capacity(16384));
}
}
{
let mut large = self.large_buffers.write();
for _ in 0..50 {
large.push(BytesMut::with_capacity(262144));
}
}
}
pub fn get_buffer(&self, size: usize) -> BytesMut {
self.allocations.fetch_add(1, Ordering::Relaxed);
let buffer = if size <= 1024 {
if let Some(buf) = self.small_buffers.write().pop() {
self.reuse_count.fetch_add(1, Ordering::Relaxed);
buf
} else {
BytesMut::with_capacity(1024)
}
} else if size <= 16384 {
if let Some(buf) = self.medium_buffers.write().pop() {
self.reuse_count.fetch_add(1, Ordering::Relaxed);
buf
} else {
BytesMut::with_capacity(16384)
}
} else if size <= 262144 {
if let Some(buf) = self.large_buffers.write().pop() {
self.reuse_count.fetch_add(1, Ordering::Relaxed);
buf
} else {
BytesMut::with_capacity(262144)
}
} else {
BytesMut::with_capacity(size)
};
buffer
}
pub fn return_buffer(&self, mut buffer: BytesMut) {
self.deallocations.fetch_add(1, Ordering::Relaxed);
buffer.clear();
let capacity = buffer.capacity();
if capacity >= 900 && capacity <= 1100 {
if self.small_buffers.read().len() < 1000 {
self.small_buffers.write().push(buffer);
}
} else if capacity >= 15000 && capacity <= 17000 {
if self.medium_buffers.read().len() < 500 {
self.medium_buffers.write().push(buffer);
}
} else if capacity >= 250000 && capacity <= 270000 {
if self.large_buffers.read().len() < 100 {
self.large_buffers.write().push(buffer);
}
}
}
pub fn get_stats(&self) -> BufferStats {
BufferStats {
allocations: self.allocations.load(Ordering::Relaxed),
deallocations: self.deallocations.load(Ordering::Relaxed),
reuse_count: self.reuse_count.load(Ordering::Relaxed),
small_pool_size: self.small_buffers.read().len(),
medium_pool_size: self.medium_buffers.read().len(),
large_pool_size: self.large_buffers.read().len(),
}
}
}
#[derive(Debug, Clone)]
pub struct BufferStats {
pub allocations: usize,
pub deallocations: usize,
pub reuse_count: usize,
pub small_pool_size: usize,
pub medium_pool_size: usize,
pub large_pool_size: usize,
}
impl BufferStats {
pub fn reuse_efficiency(&self) -> f64 {
if self.allocations == 0 {
0.0
} else {
self.reuse_count as f64 / self.allocations as f64
}
}
}
pub struct VectoredIOManager {
pending_writes: Arc<RwLock<HashMap<String, Vec<Bytes>>>>,
vectored_writes: AtomicUsize,
bytes_written: AtomicUsize,
write_operations: AtomicUsize,
}
impl VectoredIOManager {
pub fn new() -> Self {
Self {
pending_writes: Arc::new(RwLock::new(HashMap::new())),
vectored_writes: AtomicUsize::new(0),
bytes_written: AtomicUsize::new(0),
write_operations: AtomicUsize::new(0),
}
}
pub fn queue_write(&self, file_path: &str, data: Bytes) {
let mut pending = self.pending_writes.write();
pending
.entry(file_path.to_string())
.or_insert_with(Vec::new)
.push(data);
}
pub async fn flush_pending_writes(&self) -> Result<usize> {
let mut pending = self.pending_writes.write();
let mut total_bytes = 0;
for (file_path, data_vec) in pending.drain() {
if !data_vec.is_empty() {
let batch_size: usize = data_vec.iter().map(|d| d.len()).sum();
tokio::fs::write(&file_path, &data_vec.concat())
.await
.map_err(|e| crate::FluxmqError::Storage(e))?;
total_bytes += batch_size;
self.vectored_writes.fetch_add(1, Ordering::Relaxed);
self.bytes_written.fetch_add(batch_size, Ordering::Relaxed);
self.write_operations
.fetch_add(data_vec.len(), Ordering::Relaxed);
}
}
Ok(total_bytes)
}
pub fn get_stats(&self) -> IOStats {
let vectored_writes = self.vectored_writes.load(Ordering::Relaxed);
let bytes_written = self.bytes_written.load(Ordering::Relaxed);
let write_operations = self.write_operations.load(Ordering::Relaxed);
IOStats {
vectored_writes,
bytes_written,
write_operations,
avg_batch_size: if vectored_writes > 0 {
write_operations as f64 / vectored_writes as f64
} else {
0.0
},
avg_bytes_per_write: if write_operations > 0 {
bytes_written as f64 / write_operations as f64
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct IOStats {
pub vectored_writes: usize,
pub bytes_written: usize,
pub write_operations: usize,
pub avg_batch_size: f64,
pub avg_bytes_per_write: f64,
}
pub struct ConnectionPool {
max_connections: usize,
connection_timeout_ms: u64,
active_connections: AtomicUsize,
total_connections: AtomicUsize,
reused_connections: AtomicUsize,
}
impl ConnectionPool {
pub fn new(max_connections: usize) -> Self {
Self {
max_connections,
connection_timeout_ms: 30000, active_connections: AtomicUsize::new(0),
total_connections: AtomicUsize::new(0),
reused_connections: AtomicUsize::new(0),
}
}
pub fn connection_opened(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
self.total_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn connection_closed(&self) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
}
pub fn connection_reused(&self) {
self.reused_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn can_accept_connection(&self) -> bool {
self.active_connections.load(Ordering::Relaxed) < self.max_connections
}
pub fn get_stats(&self) -> ConnectionStats {
let active = self.active_connections.load(Ordering::Relaxed);
let total = self.total_connections.load(Ordering::Relaxed);
let reused = self.reused_connections.load(Ordering::Relaxed);
ConnectionStats {
active_connections: active,
total_connections: total,
reused_connections: reused,
max_connections: self.max_connections,
utilization: if self.max_connections > 0 {
active as f64 / self.max_connections as f64
} else {
0.0
},
reuse_rate: if total > 0 {
reused as f64 / total as f64
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectionStats {
pub active_connections: usize,
pub total_connections: usize,
pub reused_connections: usize,
pub max_connections: usize,
pub utilization: f64,
pub reuse_rate: f64,
}
pub struct IOOptimizationManager {
pub batch_processor: BatchProcessor,
pub buffer_manager: ZeroCopyBufferManager,
pub io_manager: VectoredIOManager,
pub connection_pool: ConnectionPool,
}
impl IOOptimizationManager {
pub fn new() -> Self {
Self {
batch_processor: BatchProcessor::new(),
buffer_manager: ZeroCopyBufferManager::new(),
io_manager: VectoredIOManager::new(),
connection_pool: ConnectionPool::new(10000), }
}
pub fn get_comprehensive_stats(&self) -> IOOptimizationStats {
IOOptimizationStats {
batch_stats: self.batch_processor.get_stats(),
buffer_stats: self.buffer_manager.get_stats(),
io_stats: self.io_manager.get_stats(),
connection_stats: self.connection_pool.get_stats(),
}
}
}
#[derive(Debug, Clone)]
pub struct IOOptimizationStats {
pub batch_stats: BatchStats,
pub buffer_stats: BufferStats,
pub io_stats: IOStats,
pub connection_stats: ConnectionStats,
}
impl IOOptimizationStats {
pub fn report(&self) -> String {
format!(
"I/O Optimization Stats:\n\
Batching: {} batches, avg size {:.1}\n\
Buffers: {:.1}% reuse rate, {} pools\n\
I/O: {} vectored writes, {:.1} avg batch size\n\
Connections: {} active ({:.1}% util), {:.1}% reuse rate",
self.batch_stats.processed_batches,
self.batch_stats.avg_batch_size,
self.buffer_stats.reuse_efficiency() * 100.0,
self.buffer_stats.small_pool_size
+ self.buffer_stats.medium_pool_size
+ self.buffer_stats.large_pool_size,
self.io_stats.vectored_writes,
self.io_stats.avg_batch_size,
self.connection_stats.active_connections,
self.connection_stats.utilization * 100.0,
self.connection_stats.reuse_rate * 100.0
)
}
}