use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
use std::sync::Arc;
use std::ptr;
use anyhow::Result;
use fzstream_common::{EventMessage, SerializationProtocol};
pub struct ProtocolStackOptimizer {
config: ProtocolOptimizationConfig,
stats: Arc<ProtocolOptimizationStats>,
fast_path_cache: Arc<FastPathCache>,
}
#[derive(Debug, Clone)]
pub struct ProtocolOptimizationConfig {
pub enable_quic_fast_path: bool,
pub skip_integrity_checks: bool,
pub skip_error_recovery: bool,
pub enable_unchecked_buffers: bool,
pub enable_inline_serialization: bool,
pub enable_batch_processing: bool,
pub max_batch_size: usize,
pub enable_preallocation: bool,
pub enable_raw_pointer_ops: bool,
}
impl Default for ProtocolOptimizationConfig {
fn default() -> Self {
Self {
enable_quic_fast_path: true,
skip_integrity_checks: true, skip_error_recovery: false, enable_unchecked_buffers: true,
enable_inline_serialization: true,
enable_batch_processing: true,
max_batch_size: 1000,
enable_preallocation: true,
enable_raw_pointer_ops: true,
}
}
}
pub struct ProtocolOptimizationStats {
pub fast_path_hits: AtomicU64,
pub slow_path_hits: AtomicU64,
pub checks_skipped: AtomicU64,
pub batch_operations: AtomicU64,
pub unchecked_operations: AtomicU64,
pub inline_operations: AtomicU64,
}
impl Default for ProtocolOptimizationStats {
fn default() -> Self {
Self {
fast_path_hits: AtomicU64::new(0),
slow_path_hits: AtomicU64::new(0),
checks_skipped: AtomicU64::new(0),
batch_operations: AtomicU64::new(0),
unchecked_operations: AtomicU64::new(0),
inline_operations: AtomicU64::new(0),
}
}
}
pub struct FastPathCache {
serialization_cache: dashmap::DashMap<String, Vec<u8>>,
hash_cache: dashmap::DashMap<String, u64>,
routing_cache: dashmap::DashMap<String, RouteInfo>,
enabled: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct RouteInfo {
pub endpoint: String,
pub connection_id: u64,
pub last_used: u64,
}
impl ProtocolStackOptimizer {
pub fn new(config: ProtocolOptimizationConfig) -> Result<Self> {
log::info!("🚀 Creating ProtocolStackOptimizer with config: {:?}", config);
let fast_path_cache = Arc::new(FastPathCache {
serialization_cache: dashmap::DashMap::new(),
hash_cache: dashmap::DashMap::new(),
routing_cache: dashmap::DashMap::new(),
enabled: AtomicBool::new(true),
});
let stats = Arc::new(ProtocolOptimizationStats::default());
Ok(Self {
config,
stats,
fast_path_cache,
})
}
#[inline(always)]
pub unsafe fn serialize_event_unchecked(
&self,
event: &EventMessage,
buffer: &mut [u8],
) -> Result<usize> {
self.stats.unchecked_operations.fetch_add(1, Ordering::Relaxed);
if self.config.enable_inline_serialization {
self.stats.inline_operations.fetch_add(1, Ordering::Relaxed);
return self.inline_serialize_unchecked(event, buffer);
}
let cache_key = format!("{}_{:?}", event.event_id, event.event_type);
if let Some(cached) = self.fast_path_cache.serialization_cache.get(&cache_key) {
let cached_len = cached.len();
if buffer.len() >= cached_len {
ptr::copy_nonoverlapping(cached.as_ptr(), buffer.as_mut_ptr(), cached_len);
self.stats.fast_path_hits.fetch_add(1, Ordering::Relaxed);
return Ok(cached_len);
}
}
let serialized_size = self.fast_serialize_event(event, buffer)?;
if serialized_size < 4096 { let cached_data = buffer[..serialized_size].to_vec();
self.fast_path_cache.serialization_cache.insert(cache_key, cached_data);
}
Ok(serialized_size)
}
#[inline(always)]
unsafe fn inline_serialize_unchecked(
&self,
event: &EventMessage,
buffer: &mut [u8],
) -> Result<usize> {
let mut offset = 0;
let event_id_bytes = event.event_id.as_bytes();
let event_id_len = event_id_bytes.len();
*(buffer.as_mut_ptr().add(offset) as *mut u32) = event_id_len as u32;
offset += 4;
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
buffer.as_mut_ptr().add(offset),
event_id_bytes.as_ptr(),
event_id_len
);
offset += event_id_len;
let event_type_byte = match event.event_type {
fzstream_common::EventType::BlockMeta => 0u8,
fzstream_common::EventType::PumpFunBuy => 1u8,
fzstream_common::EventType::BonkBuyExactIn => 2u8,
_ => 255u8, };
*(buffer.as_mut_ptr().add(offset) as *mut u8) = event_type_byte;
offset += 1;
let data_len = event.data.len();
*(buffer.as_mut_ptr().add(offset) as *mut u32) = data_len as u32;
offset += 4;
if data_len > 0 {
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
buffer.as_mut_ptr().add(offset),
event.data.as_ptr(),
data_len
);
offset += data_len;
}
*(buffer.as_mut_ptr().add(offset) as *mut u64) = event.timestamp;
offset += 8;
if self.config.skip_integrity_checks {
self.stats.checks_skipped.fetch_add(5, Ordering::Relaxed); }
Ok(offset)
}
#[inline(always)]
fn fast_serialize_event(&self, event: &EventMessage, buffer: &mut [u8]) -> Result<usize> {
match event.serialization_format {
SerializationProtocol::Bincode => {
self.fast_bincode_serialize(event, buffer)
}
SerializationProtocol::JSON => {
self.fast_json_serialize(event, buffer)
}
SerializationProtocol::Auto => {
if event.data.len() < 1024 {
self.fast_json_serialize(event, buffer)
} else {
self.fast_bincode_serialize(event, buffer)
}
}
}
}
#[inline(always)]
fn fast_bincode_serialize(&self, event: &EventMessage, buffer: &mut [u8]) -> Result<usize> {
let serialized = bincode::serialize(event)
.map_err(|e| anyhow::anyhow!("Bincode serialization failed: {}", e))?;
if serialized.len() <= buffer.len() {
unsafe {
ptr::copy_nonoverlapping(
serialized.as_ptr(),
buffer.as_mut_ptr(),
serialized.len()
);
}
Ok(serialized.len())
} else {
Err(anyhow::anyhow!("Buffer too small"))
}
}
#[inline(always)]
fn fast_json_serialize(&self, event: &EventMessage, buffer: &mut [u8]) -> Result<usize> {
let json_str = serde_json::to_string(event)
.map_err(|e| anyhow::anyhow!("JSON serialization failed: {}", e))?;
let json_bytes = json_str.as_bytes();
if json_bytes.len() <= buffer.len() {
unsafe {
ptr::copy_nonoverlapping(
json_bytes.as_ptr(),
buffer.as_mut_ptr(),
json_bytes.len()
);
}
Ok(json_bytes.len())
} else {
Err(anyhow::anyhow!("Buffer too small"))
}
}
#[inline(always)]
pub fn process_events_batch(&self, events: &[EventMessage], output_buffers: &mut [&mut [u8]]) -> Result<Vec<usize>> {
if events.len() != output_buffers.len() {
return Err(anyhow::anyhow!("Events and buffers length mismatch"));
}
self.stats.batch_operations.fetch_add(1, Ordering::Relaxed);
let mut sizes = Vec::with_capacity(events.len());
for (event, buffer) in events.iter().zip(output_buffers.iter_mut()) {
let size = unsafe {
self.serialize_event_unchecked(event, buffer)?
};
sizes.push(size);
}
Ok(sizes)
}
#[inline(always)]
pub fn quic_fast_path_send(&self, data: &[u8], connection_id: u64) -> Result<()> {
if !self.config.enable_quic_fast_path {
self.stats.slow_path_hits.fetch_add(1, Ordering::Relaxed);
return self.quic_standard_send(data, connection_id);
}
self.stats.fast_path_hits.fetch_add(1, Ordering::Relaxed);
if self.config.skip_integrity_checks {
self.stats.checks_skipped.fetch_add(1, Ordering::Relaxed);
}
unsafe {
self.raw_quic_send_unchecked(data, connection_id)
}
}
#[inline(always)]
unsafe fn raw_quic_send_unchecked(&self, data: &[u8], connection_id: u64) -> Result<()> {
if !self.config.enable_raw_pointer_ops {
return self.quic_standard_send(data, connection_id);
}
log::trace!("Fast path send: {} bytes to connection {}", data.len(), connection_id);
Ok(())
}
fn quic_standard_send(&self, data: &[u8], connection_id: u64) -> Result<()> {
log::trace!("Standard path send: {} bytes to connection {}", data.len(), connection_id);
Ok(())
}
#[inline(always)]
pub unsafe fn unchecked_buffer_write(&self, src: &[u8], dst: &mut [u8], offset: usize) -> usize {
if !self.config.enable_unchecked_buffers {
let available = dst.len().saturating_sub(offset);
let to_copy = src.len().min(available);
dst[offset..offset + to_copy].copy_from_slice(&src[..to_copy]);
return to_copy;
}
self.stats.unchecked_operations.fetch_add(1, Ordering::Relaxed);
let dst_ptr = dst.as_mut_ptr().add(offset);
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
dst_ptr,
src.as_ptr(),
src.len()
);
src.len()
}
pub fn precalculate_routes(&self, endpoints: &[String]) -> Result<()> {
for (index, endpoint) in endpoints.iter().enumerate() {
let route_info = RouteInfo {
endpoint: endpoint.clone(),
connection_id: index as u64,
last_used: 0,
};
self.fast_path_cache.routing_cache.insert(endpoint.clone(), route_info);
}
log::info!("✅ Precalculated {} routes", endpoints.len());
Ok(())
}
#[inline(always)]
pub fn fast_route_lookup(&self, endpoint: &str) -> Option<u64> {
self.fast_path_cache.routing_cache
.get(endpoint)
.map(|route| route.connection_id)
}
pub fn get_stats(&self) -> ProtocolOptimizationStatsSnapshot {
ProtocolOptimizationStatsSnapshot {
fast_path_hits: self.stats.fast_path_hits.load(Ordering::Relaxed),
slow_path_hits: self.stats.slow_path_hits.load(Ordering::Relaxed),
checks_skipped: self.stats.checks_skipped.load(Ordering::Relaxed),
batch_operations: self.stats.batch_operations.load(Ordering::Relaxed),
unchecked_operations: self.stats.unchecked_operations.load(Ordering::Relaxed),
inline_operations: self.stats.inline_operations.load(Ordering::Relaxed),
}
}
pub fn cleanup_cache(&self) {
let cache_size_before = self.fast_path_cache.serialization_cache.len();
self.fast_path_cache.serialization_cache.clear();
self.fast_path_cache.hash_cache.clear();
log::info!("🧹 Cache cleanup: removed {} serialization entries", cache_size_before);
}
pub fn extreme_optimization_config() -> ProtocolOptimizationConfig {
ProtocolOptimizationConfig {
enable_quic_fast_path: true,
skip_integrity_checks: true,
skip_error_recovery: true, enable_unchecked_buffers: true,
enable_inline_serialization: true,
enable_batch_processing: true,
max_batch_size: 10000, enable_preallocation: true,
enable_raw_pointer_ops: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ProtocolOptimizationStatsSnapshot {
pub fast_path_hits: u64,
pub slow_path_hits: u64,
pub checks_skipped: u64,
pub batch_operations: u64,
pub unchecked_operations: u64,
pub inline_operations: u64,
}
impl ProtocolOptimizationStatsSnapshot {
pub fn fast_path_hit_rate(&self) -> f64 {
let total = self.fast_path_hits + self.slow_path_hits;
if total == 0 {
0.0
} else {
self.fast_path_hits as f64 / total as f64
}
}
pub fn print_stats(&self) {
log::info!("📊 Protocol Optimization Stats:");
log::info!(" 🚀 Fast Path: {} hits ({:.1}% hit rate)",
self.fast_path_hits, self.fast_path_hit_rate() * 100.0);
log::info!(" 🐌 Slow Path: {} hits", self.slow_path_hits);
log::info!(" ✂️ Checks Skipped: {}", self.checks_skipped);
log::info!(" 📦 Batch Operations: {}", self.batch_operations);
log::info!(" ⚡ Unchecked Ops: {}", self.unchecked_operations);
log::info!(" 🔗 Inline Ops: {}", self.inline_operations);
}
}
#[macro_export]
macro_rules! bypass_check {
($condition:expr, $bypass_enabled:expr) => {
if $bypass_enabled {
true
} else {
$condition
}
};
}
#[macro_export]
macro_rules! fast_serialize {
($data:expr, $buffer:expr, $optimizer:expr) => {
unsafe {
$optimizer.serialize_event_unchecked($data, $buffer)
}
};
}
#[cfg(test)]
mod tests {
use super::*;
use fzstream_common::{CompressionLevel};
use solana_streamer_sdk::streaming::event_parser::common::EventType;
#[test]
fn test_protocol_optimizer_creation() {
let config = ProtocolOptimizationConfig::default();
let optimizer = ProtocolStackOptimizer::new(config).unwrap();
let stats = optimizer.get_stats();
assert_eq!(stats.fast_path_hits, 0);
assert_eq!(stats.slow_path_hits, 0);
}
#[test]
fn test_extreme_optimization_config() {
let config = ProtocolStackOptimizer::extreme_optimization_config();
assert!(config.enable_quic_fast_path);
assert!(config.skip_integrity_checks);
assert!(config.skip_error_recovery);
assert!(config.enable_unchecked_buffers);
assert_eq!(config.max_batch_size, 10000);
}
#[test]
fn test_unsafe_serialization() {
let config = ProtocolOptimizationConfig::default();
let optimizer = ProtocolStackOptimizer::new(config).unwrap();
let event = EventMessage {
event_id: "test".to_string(),
event_type: EventType::BlockMeta,
data: vec![1, 2, 3, 4, 5],
serialization_format: SerializationProtocol::Bincode,
compression_format: CompressionLevel::None,
is_compressed: false,
timestamp: 1234567890,
original_size: Some(5),
grpc_arrival_time: 0,
parsing_time: 0,
completion_time: 0,
client_processing_start: None,
client_processing_end: None,
};
let mut buffer = vec![0u8; 1024];
let size = unsafe {
optimizer.serialize_event_unchecked(&event, &mut buffer).unwrap()
};
assert!(size > 0);
assert!(size < buffer.len());
let stats = optimizer.get_stats();
assert_eq!(stats.unchecked_operations, 1);
}
#[test]
fn test_route_caching() {
let config = ProtocolOptimizationConfig::default();
let optimizer = ProtocolStackOptimizer::new(config).unwrap();
let endpoints = vec!["127.0.0.1:8080".to_string(), "127.0.0.1:8081".to_string()];
optimizer.precalculate_routes(&endpoints).unwrap();
assert_eq!(optimizer.fast_route_lookup("127.0.0.1:8080"), Some(0));
assert_eq!(optimizer.fast_route_lookup("127.0.0.1:8081"), Some(1));
assert_eq!(optimizer.fast_route_lookup("127.0.0.1:9999"), None);
}
#[test]
fn test_batch_processing() {
let config = ProtocolOptimizationConfig::default();
let optimizer = ProtocolStackOptimizer::new(config).unwrap();
let events = vec![
EventMessage {
event_id: "test1".to_string(),
event_type: EventType::BlockMeta,
data: vec![1, 2, 3],
serialization_format: SerializationProtocol::Bincode,
compression_format: CompressionLevel::None,
is_compressed: false,
timestamp: 1234567890,
original_size: Some(3),
grpc_arrival_time: 0,
parsing_time: 0,
completion_time: 0,
client_processing_start: None,
client_processing_end: None,
},
EventMessage {
event_id: "test2".to_string(),
event_type: EventType::BlockMeta,
data: vec![4, 5, 6],
serialization_format: SerializationProtocol::Bincode,
compression_format: CompressionLevel::None,
is_compressed: false,
timestamp: 1234567891,
original_size: Some(3),
grpc_arrival_time: 0,
parsing_time: 0,
completion_time: 0,
client_processing_start: None,
client_processing_end: None,
},
];
let mut buffer1 = vec![0u8; 1024];
let mut buffer2 = vec![0u8; 1024];
let mut buffers = vec![buffer1.as_mut_slice(), buffer2.as_mut_slice()];
let sizes = optimizer.process_events_batch(&events, &mut buffers).unwrap();
assert_eq!(sizes.len(), 2);
assert!(sizes[0] > 0);
assert!(sizes[1] > 0);
let stats = optimizer.get_stats();
assert_eq!(stats.batch_operations, 1);
}
}