#![allow(dead_code)]
use crate::Result;
use bytes::{BufMut, Bytes, BytesMut};
use parking_lot::RwLock;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
#[derive(Clone)]
pub struct SocketConfig {
pub tcp_nodelay: bool,
pub send_buffer_size: usize,
pub receive_buffer_size: usize,
pub keepalive: bool,
pub reuse_address: bool,
pub reuse_port: bool,
}
impl Default for SocketConfig {
fn default() -> Self {
Self {
tcp_nodelay: true, send_buffer_size: 1024 * 1024, receive_buffer_size: 1024 * 1024, keepalive: true,
reuse_address: true,
reuse_port: true, }
}
}
pub struct AdvancedConnectionManager {
connections: Arc<RwLock<HashMap<u64, Arc<OptimizedConnection>>>>,
response_cache: Arc<RwLock<HashMap<String, Bytes>>>,
write_buffers: Arc<RwLock<VecDeque<BytesMut>>>,
total_connections: AtomicU64,
active_connections: AtomicU64,
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
responses_cached: AtomicU64,
cache_hits: AtomicU64,
config: SocketConfig,
next_connection_id: AtomicU64,
}
pub struct OptimizedConnection {
pub id: u64,
pub stream: Arc<RwLock<TcpStream>>,
pub write_buffer: Arc<RwLock<BytesMut>>,
pub pending_writes: Arc<RwLock<VecDeque<Bytes>>>,
pub bytes_sent: AtomicU64,
pub bytes_received: AtomicU64,
pub last_activity: AtomicU64,
pub is_active: AtomicBool,
pub response_cache: Arc<RwLock<HashMap<String, Bytes>>>,
pub write_coalescing: AtomicBool, }
impl AdvancedConnectionManager {
pub fn new() -> Self {
Self {
connections: Arc::new(RwLock::new(HashMap::new())),
response_cache: Arc::new(RwLock::new(HashMap::new())),
write_buffers: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
total_connections: AtomicU64::new(0),
active_connections: AtomicU64::new(0),
bytes_sent: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
responses_cached: AtomicU64::new(0),
cache_hits: AtomicU64::new(0),
config: SocketConfig::default(),
next_connection_id: AtomicU64::new(1),
}
}
pub async fn register_optimized_connection(&self, mut stream: TcpStream) -> Result<u64> {
if let Err(e) = self.tune_socket(&mut stream).await {
tracing::warn!("Failed to tune socket: {}", e);
}
let id = self.next_connection_id.fetch_add(1, Ordering::Relaxed);
let connection = Arc::new(OptimizedConnection {
id,
stream: Arc::new(RwLock::new(stream)),
write_buffer: Arc::new(RwLock::new(BytesMut::with_capacity(65536))),
pending_writes: Arc::new(RwLock::new(VecDeque::new())),
bytes_sent: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
last_activity: AtomicU64::new(Self::current_timestamp()),
is_active: AtomicBool::new(true),
response_cache: Arc::new(RwLock::new(HashMap::new())),
write_coalescing: AtomicBool::new(true),
});
self.connections.write().insert(id, connection);
self.total_connections.fetch_add(1, Ordering::Relaxed);
self.active_connections.fetch_add(1, Ordering::Relaxed);
Ok(id)
}
async fn tune_socket(&self, stream: &mut TcpStream) -> Result<()> {
if self.config.tcp_nodelay {
stream.set_nodelay(true).map_err(|e| {
crate::FluxmqError::Network(format!("Failed to set TCP_NODELAY: {}", e))
})?;
}
Ok(())
}
pub async fn send_optimized_response(&self, connection_id: u64, data: Bytes) -> Result<()> {
let connection = {
let connections = self.connections.read();
connections.get(&connection_id).cloned()
};
if let Some(conn) = connection {
if conn.write_coalescing.load(Ordering::Relaxed) {
conn.pending_writes.write().push_back(data);
if conn.pending_writes.read().len() >= 10 {
self.flush_coalesced_writes(connection_id).await?;
}
} else {
self.direct_write(conn, data).await?;
}
Ok(())
} else {
Err(crate::FluxmqError::Network(
"Connection not found".to_string(),
))
}
}
pub async fn flush_coalesced_writes(&self, connection_id: u64) -> Result<()> {
let connection = {
let connections = self.connections.read();
connections.get(&connection_id).cloned()
};
if let Some(conn) = connection {
let pending_writes: Vec<Bytes> = {
let mut pending = conn.pending_writes.write();
pending.drain(..).collect()
};
if !pending_writes.is_empty() {
let total_size: usize = pending_writes.iter().map(|b| b.len()).sum();
let mut coalesced_buffer = BytesMut::with_capacity(total_size);
for data in pending_writes {
coalesced_buffer.extend_from_slice(&data);
}
self.direct_write(conn, coalesced_buffer.freeze()).await?;
}
}
Ok(())
}
async fn direct_write(&self, connection: Arc<OptimizedConnection>, data: Bytes) -> Result<()> {
let mut stream = connection.stream.write();
match stream.write_all(&data).await {
Ok(_) => {
connection
.bytes_sent
.fetch_add(data.len() as u64, Ordering::Relaxed);
self.bytes_sent
.fetch_add(data.len() as u64, Ordering::Relaxed);
connection
.last_activity
.store(Self::current_timestamp(), Ordering::Relaxed);
Ok(())
}
Err(e) => {
connection.is_active.store(false, Ordering::Relaxed);
Err(crate::FluxmqError::Network(format!("Write failed: {}", e)))
}
}
}
pub fn cache_response(&self, key: String, response: Bytes) {
let mut cache = self.response_cache.write();
if cache.len() >= 10000 {
let keys_to_remove: Vec<String> = cache.keys().take(1000).cloned().collect();
for key in keys_to_remove {
cache.remove(&key);
}
}
cache.insert(key, response);
self.responses_cached.fetch_add(1, Ordering::Relaxed);
}
pub fn get_cached_response(&self, key: &str) -> Option<Bytes> {
if let Some(response) = self.response_cache.read().get(key).cloned() {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
Some(response)
} else {
None
}
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
pub fn get_advanced_stats(&self) -> AdvancedNetworkStats {
let total_connections = self.total_connections.load(Ordering::Relaxed);
let active_connections = self.active_connections.load(Ordering::Relaxed);
let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
let bytes_received = self.bytes_received.load(Ordering::Relaxed);
let responses_cached = self.responses_cached.load(Ordering::Relaxed);
let cache_hits = self.cache_hits.load(Ordering::Relaxed);
AdvancedNetworkStats {
total_connections,
active_connections,
bytes_sent,
bytes_received,
responses_cached,
cache_hits,
cache_hit_rate: if responses_cached > 0 {
cache_hits as f64 / responses_cached as f64
} else {
0.0
},
avg_bytes_per_connection: if total_connections > 0 {
bytes_sent as f64 / total_connections as f64
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct AdvancedNetworkStats {
pub total_connections: u64,
pub active_connections: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub responses_cached: u64,
pub cache_hits: u64,
pub cache_hit_rate: f64,
pub avg_bytes_per_connection: f64,
}
impl AdvancedNetworkStats {
pub fn report(&self) -> String {
format!(
"Advanced Networking Stats:\n\
Connections: {} active / {} total\n\
Data Transfer: {:.1} MB sent, {:.1} MB received\n\
Response Caching: {} cached, {:.1}% hit rate\n\
Average: {:.0} bytes/connection",
self.active_connections,
self.total_connections,
self.bytes_sent as f64 / 1_000_000.0,
self.bytes_received as f64 / 1_000_000.0,
self.responses_cached,
self.cache_hit_rate * 100.0,
self.avg_bytes_per_connection
)
}
}
pub struct ZeroAllocResponseBuilder {
produce_template: BytesMut,
fetch_template: BytesMut,
metadata_template: BytesMut,
api_versions_template: BytesMut,
buffer_pool: Arc<RwLock<VecDeque<BytesMut>>>,
responses_built: AtomicU64,
zero_alloc_responses: AtomicU64,
template_uses: AtomicU64,
}
impl ZeroAllocResponseBuilder {
pub fn new() -> Self {
let mut builder = Self {
produce_template: BytesMut::with_capacity(256),
fetch_template: BytesMut::with_capacity(512),
metadata_template: BytesMut::with_capacity(1024),
api_versions_template: BytesMut::with_capacity(512),
buffer_pool: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
responses_built: AtomicU64::new(0),
zero_alloc_responses: AtomicU64::new(0),
template_uses: AtomicU64::new(0),
};
builder.build_templates();
builder
}
fn build_templates(&mut self) {
self.produce_template.clear();
self.produce_template.put_u32(0); self.produce_template.put_i32(0); self.produce_template.put_u16(0);
self.api_versions_template.clear();
self.api_versions_template.put_u32(0); self.api_versions_template.put_i32(0); self.api_versions_template.put_i16(0); self.api_versions_template.put_u8(6);
for (api_key, min_ver, max_ver) in [
(0u16, 0u16, 8u16),
(1, 0, 11),
(3, 0, 9),
(18, 0, 4),
(19, 0, 5),
] {
self.api_versions_template.put_u16(api_key);
self.api_versions_template.put_u16(min_ver);
self.api_versions_template.put_u16(max_ver);
self.api_versions_template.put_u8(0); }
self.api_versions_template.put_u8(0); self.api_versions_template.put_u32(0); }
pub fn build_produce_response(
&self,
correlation_id: i32,
topic: &str,
partition: u32,
offset: u64,
) -> Bytes {
self.responses_built.fetch_add(1, Ordering::Relaxed);
let mut buffer = {
let mut pool = self.buffer_pool.write();
pool.pop_front()
.unwrap_or_else(|| BytesMut::with_capacity(256))
};
if buffer.capacity() >= 256 {
self.zero_alloc_responses.fetch_add(1, Ordering::Relaxed);
}
buffer.clear();
let response_size = 4 + 4 + 2 + 2 + topic.len() + 4 + 8 + 2;
buffer.put_u32(response_size as u32 - 4); buffer.put_i32(correlation_id);
buffer.put_i16(0); buffer.put_u16(topic.len() as u16);
buffer.put_slice(topic.as_bytes());
buffer.put_u32(partition);
buffer.put_u64(offset);
buffer.put_i16(0);
let buffer_capacity = buffer.capacity();
let response = buffer.freeze();
if buffer_capacity >= 128 && buffer_capacity <= 512 {
let buffer_mut = BytesMut::with_capacity(buffer_capacity);
let mut pool = self.buffer_pool.write();
if pool.len() < 100 {
pool.push_back(buffer_mut);
}
}
response
}
pub fn build_api_versions_response(&self, correlation_id: i32) -> Bytes {
self.responses_built.fetch_add(1, Ordering::Relaxed);
self.template_uses.fetch_add(1, Ordering::Relaxed);
let mut response = BytesMut::from(&self.api_versions_template[..]);
(&mut response[4..8]).copy_from_slice(&correlation_id.to_be_bytes());
let length = response.len() - 4;
(&mut response[0..4]).copy_from_slice(&(length as u32).to_be_bytes());
self.zero_alloc_responses.fetch_add(1, Ordering::Relaxed);
response.freeze()
}
pub fn get_stats(&self) -> ResponseBuilderStats {
let responses_built = self.responses_built.load(Ordering::Relaxed);
let zero_alloc_responses = self.zero_alloc_responses.load(Ordering::Relaxed);
let template_uses = self.template_uses.load(Ordering::Relaxed);
ResponseBuilderStats {
responses_built,
zero_alloc_responses,
template_uses,
zero_alloc_rate: if responses_built > 0 {
zero_alloc_responses as f64 / responses_built as f64
} else {
0.0
},
template_usage_rate: if responses_built > 0 {
template_uses as f64 / responses_built as f64
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct ResponseBuilderStats {
pub responses_built: u64,
pub zero_alloc_responses: u64,
pub template_uses: u64,
pub zero_alloc_rate: f64,
pub template_usage_rate: f64,
}
impl ResponseBuilderStats {
pub fn report(&self) -> String {
format!(
"Response Builder Stats:\n\
Responses Built: {}\n\
Zero-Alloc Rate: {:.1}%\n\
Template Usage: {:.1}%",
self.responses_built,
self.zero_alloc_rate * 100.0,
self.template_usage_rate * 100.0
)
}
}