use std::sync::{Arc, atomic::{AtomicU64, AtomicUsize, AtomicBool, Ordering}};
use std::time::{Duration, Instant};
use crossbeam_queue::ArrayQueue;
use crossbeam_utils::CachePadded;
use fzstream_common::EventMessage;
use tokio::sync::Notify;
use anyhow::Result;
use log::{info, warn, debug};
pub struct LockFreeEventDispatcher {
event_queues: Vec<Arc<ArrayQueue<EventMessage>>>,
client_queue_mapping: Arc<dashmap::DashMap<String, usize>>,
queue_selector: CachePadded<AtomicUsize>,
stats: Arc<UltraLowLatencyStats>,
prefetch_optimizer: Arc<PrefetchOptimizer>,
cpu_affinity: Option<CpuAffinityConfig>,
}
#[derive(Clone, Debug)]
pub struct CpuAffinityConfig {
pub core_ids: Vec<usize>,
pub numa_optimization: bool,
pub priority: ThreadPriority,
}
#[derive(Clone, Debug)]
pub enum ThreadPriority {
Normal,
High,
RealTime,
}
pub struct PrefetchOptimizer {
prediction_cache: Arc<ArrayQueue<EventMessage>>,
hit_count: AtomicU64,
miss_count: AtomicU64,
learning_enabled: AtomicBool,
}
impl PrefetchOptimizer {
pub fn new(cache_size: usize) -> Self {
Self {
prediction_cache: Arc::new(ArrayQueue::new(cache_size)),
hit_count: AtomicU64::new(0),
miss_count: AtomicU64::new(0),
learning_enabled: AtomicBool::new(true),
}
}
#[inline(always)]
pub fn prefetch_event_data(&self, event: &EventMessage) {
if !self.learning_enabled.load(Ordering::Relaxed) {
return;
}
if let Ok(_) = self.prediction_cache.push(event.clone()) {
}
}
#[inline(always)]
pub fn try_get_prefetched(&self) -> Option<EventMessage> {
if let Some(event) = self.prediction_cache.pop() {
self.hit_count.fetch_add(1, Ordering::Relaxed);
Some(event)
} else {
self.miss_count.fetch_add(1, Ordering::Relaxed);
None
}
}
pub fn get_stats(&self) -> (u64, u64, f64) {
let hits = self.hit_count.load(Ordering::Relaxed);
let misses = self.miss_count.load(Ordering::Relaxed);
let hit_rate = if hits + misses > 0 {
hits as f64 / (hits + misses) as f64
} else {
0.0
};
(hits, misses, hit_rate)
}
}
pub struct UltraLowLatencyStats {
pub events_processed: CachePadded<AtomicU64>,
pub total_latency_ns: CachePadded<AtomicU64>,
pub min_latency_ns: CachePadded<AtomicU64>,
pub max_latency_ns: CachePadded<AtomicU64>,
pub sub_millisecond_events: CachePadded<AtomicU64>,
pub ultra_fast_events: CachePadded<AtomicU64>,
pub lightning_fast_events: CachePadded<AtomicU64>,
pub queue_overflows: CachePadded<AtomicU64>,
pub prefetch_hits: CachePadded<AtomicU64>,
}
impl UltraLowLatencyStats {
pub fn new() -> Self {
Self {
events_processed: CachePadded::new(AtomicU64::new(0)),
total_latency_ns: CachePadded::new(AtomicU64::new(0)),
min_latency_ns: CachePadded::new(AtomicU64::new(u64::MAX)),
max_latency_ns: CachePadded::new(AtomicU64::new(0)),
sub_millisecond_events: CachePadded::new(AtomicU64::new(0)),
ultra_fast_events: CachePadded::new(AtomicU64::new(0)),
lightning_fast_events: CachePadded::new(AtomicU64::new(0)),
queue_overflows: CachePadded::new(AtomicU64::new(0)),
prefetch_hits: CachePadded::new(AtomicU64::new(0)),
}
}
#[inline(always)]
pub fn record_event_latency(&self, latency_ns: u64) {
self.events_processed.fetch_add(1, Ordering::Relaxed);
self.total_latency_ns.fetch_add(latency_ns, Ordering::Relaxed);
let mut current_min = self.min_latency_ns.load(Ordering::Relaxed);
while latency_ns < current_min {
match self.min_latency_ns.compare_exchange_weak(
current_min, latency_ns, Ordering::Relaxed, Ordering::Relaxed
) {
Ok(_) => break,
Err(x) => current_min = x,
}
}
let mut current_max = self.max_latency_ns.load(Ordering::Relaxed);
while latency_ns > current_max {
match self.max_latency_ns.compare_exchange_weak(
current_max, latency_ns, Ordering::Relaxed, Ordering::Relaxed
) {
Ok(_) => break,
Err(x) => current_max = x,
}
}
if latency_ns < 1_000_000 { self.sub_millisecond_events.fetch_add(1, Ordering::Relaxed);
}
if latency_ns < 100_000 { self.ultra_fast_events.fetch_add(1, Ordering::Relaxed);
}
if latency_ns < 10_000 { self.lightning_fast_events.fetch_add(1, Ordering::Relaxed);
}
}
pub fn get_summary(&self) -> UltraLatencySummary {
let events_processed = self.events_processed.load(Ordering::Relaxed);
let total_latency_ns = self.total_latency_ns.load(Ordering::Relaxed);
let min_latency_ns = self.min_latency_ns.load(Ordering::Relaxed);
let max_latency_ns = self.max_latency_ns.load(Ordering::Relaxed);
let sub_ms_events = self.sub_millisecond_events.load(Ordering::Relaxed);
let ultra_fast_events = self.ultra_fast_events.load(Ordering::Relaxed);
let lightning_fast_events = self.lightning_fast_events.load(Ordering::Relaxed);
let avg_latency_ns = if events_processed > 0 {
total_latency_ns as f64 / events_processed as f64
} else {
0.0
};
let sub_ms_percentage = if events_processed > 0 {
sub_ms_events as f64 / events_processed as f64 * 100.0
} else {
0.0
};
let ultra_fast_percentage = if events_processed > 0 {
ultra_fast_events as f64 / events_processed as f64 * 100.0
} else {
0.0
};
let lightning_fast_percentage = if events_processed > 0 {
lightning_fast_events as f64 / events_processed as f64 * 100.0
} else {
0.0
};
UltraLatencySummary {
events_processed,
avg_latency_ns,
min_latency_ns: if min_latency_ns == u64::MAX { 0.0 } else { min_latency_ns as f64 },
max_latency_ns: max_latency_ns as f64,
avg_latency_us: avg_latency_ns / 1000.0,
sub_millisecond_percentage: sub_ms_percentage,
ultra_fast_percentage,
lightning_fast_percentage,
target_achieved: avg_latency_ns < 1_000_000.0, }
}
}
#[derive(Debug, Clone)]
pub struct UltraLatencySummary {
pub events_processed: u64,
pub avg_latency_ns: f64,
pub min_latency_ns: f64,
pub max_latency_ns: f64,
pub avg_latency_us: f64,
pub sub_millisecond_percentage: f64,
pub ultra_fast_percentage: f64,
pub lightning_fast_percentage: f64,
pub target_achieved: bool,
}
impl LockFreeEventDispatcher {
pub fn new(
num_queues: usize,
queue_capacity: usize,
cpu_affinity: Option<CpuAffinityConfig>
) -> Self {
let mut event_queues = Vec::with_capacity(num_queues);
for _ in 0..num_queues {
event_queues.push(Arc::new(ArrayQueue::new(queue_capacity)));
}
info!("🚀 Created LockFreeEventDispatcher: {} queues, capacity {} each",
num_queues, queue_capacity);
Self {
event_queues,
client_queue_mapping: Arc::new(dashmap::DashMap::new()),
queue_selector: CachePadded::new(AtomicUsize::new(0)),
stats: Arc::new(UltraLowLatencyStats::new()),
prefetch_optimizer: Arc::new(PrefetchOptimizer::new(1000)),
cpu_affinity,
}
}
#[inline(always)]
pub fn dispatch_event_ultra_fast(&self, client_id: &str, event: EventMessage) -> Result<()> {
let start_time = Instant::now();
let queue_index = if let Some(index) = self.client_queue_mapping.get(client_id) {
*index
} else {
let index = self.queue_selector.fetch_add(1, Ordering::Relaxed) % self.event_queues.len();
self.client_queue_mapping.insert(client_id.to_string(), index);
index
};
self.prefetch_optimizer.prefetch_event_data(&event);
let queue = &self.event_queues[queue_index];
match queue.push(event) {
Ok(_) => {
let latency_ns = start_time.elapsed().as_nanos() as u64;
self.stats.record_event_latency(latency_ns);
Ok(())
}
Err(_) => {
self.stats.queue_overflows.fetch_add(1, Ordering::Relaxed);
Err(anyhow::anyhow!("Queue overflow for client: {}", client_id))
}
}
}
pub async fn start_processing_workers(&self, num_workers: usize) -> Result<()> {
info!("🚀 Starting {} ultra-low-latency processing workers", num_workers);
for worker_id in 0..num_workers {
let queues = self.event_queues.clone();
let stats = Arc::clone(&self.stats);
let cpu_affinity = self.cpu_affinity.clone();
tokio::spawn(async move {
if let Some(affinity_config) = &cpu_affinity {
if let Err(e) = Self::set_thread_affinity(worker_id, affinity_config) {
warn!("Failed to set CPU affinity for worker {}: {}", worker_id, e);
} else {
info!("✅ Worker {} bound to CPU core", worker_id);
}
}
Self::worker_main_loop(worker_id, queues, stats).await;
});
}
Ok(())
}
async fn worker_main_loop(
worker_id: usize,
queues: Vec<Arc<ArrayQueue<EventMessage>>>,
stats: Arc<UltraLowLatencyStats>
) {
info!("🔄 Worker {} started ultra-low-latency processing loop", worker_id);
let mut queue_index = worker_id; let notify = Arc::new(Notify::new());
loop {
let mut processed_any = false;
for _ in 0..queues.len() {
let queue = &queues[queue_index % queues.len()];
let mut batch_count = 0;
while batch_count < 100 { match queue.pop() {
Some(event) => {
let process_start = Instant::now();
Self::process_event_ultra_fast(&event).await;
let process_latency = process_start.elapsed().as_nanos() as u64;
stats.record_event_latency(process_latency);
processed_any = true;
batch_count += 1;
}
None => break,
}
}
queue_index = (queue_index + 1) % queues.len();
}
if !processed_any {
tokio::task::yield_now().await;
tokio::select! {
_ = tokio::time::sleep(Duration::from_nanos(100)) => {}, _ = notify.notified() => {}, }
}
}
}
#[inline(always)]
async fn process_event_ultra_fast(event: &EventMessage) {
debug!("Processing event: {} bytes", event.data.len());
tokio::task::yield_now().await;
}
fn set_thread_affinity(worker_id: usize, config: &CpuAffinityConfig) -> Result<()> {
if config.core_ids.is_empty() {
return Ok(());
}
#[allow(unused_variables)]
let core_id = config.core_ids[worker_id % config.core_ids.len()];
#[cfg(target_os = "linux")]
{
use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
unsafe {
let mut cpuset: cpu_set_t = std::mem::zeroed();
CPU_ZERO(&mut cpuset);
CPU_SET(core_id, &mut cpuset);
if sched_setaffinity(0, std::mem::size_of::<cpu_set_t>(), &cpuset) != 0 {
return Err(anyhow::anyhow!("Failed to set CPU affinity to core {}", core_id));
}
}
}
#[cfg(target_os = "macos")]
{
info!("CPU affinity not supported on macOS, setting thread priority instead");
}
#[cfg(target_os = "windows")]
{
use winapi::um::processthreadsapi::{GetCurrentThread, SetThreadAffinityMask};
unsafe {
let affinity_mask = 1u64 << core_id;
if SetThreadAffinityMask(GetCurrentThread(), affinity_mask as usize) == 0 {
return Err(anyhow::anyhow!("Failed to set CPU affinity to core {}", core_id));
}
}
}
Ok(())
}
pub fn get_performance_stats(&self) -> UltraLatencySummary {
self.stats.get_summary()
}
pub fn get_prefetch_stats(&self) -> (u64, u64, f64) {
self.prefetch_optimizer.get_stats()
}
pub fn get_queue_stats(&self) -> Vec<(usize, usize)> {
self.event_queues.iter().enumerate()
.map(|(i, queue)| (i, queue.len()))
.collect()
}
}
pub struct ZeroAllocSerializer {
buffer_pool: Arc<ArrayQueue<Vec<u8>>>,
size_hints: Arc<dashmap::DashMap<String, usize>>,
}
impl ZeroAllocSerializer {
pub fn new(pool_size: usize, buffer_size: usize) -> Self {
let buffer_pool = Arc::new(ArrayQueue::new(pool_size));
for _ in 0..pool_size {
let _ = buffer_pool.push(Vec::with_capacity(buffer_size));
}
Self {
buffer_pool,
size_hints: Arc::new(dashmap::DashMap::new()),
}
}
#[inline(always)]
pub fn serialize_zero_alloc<T: serde::Serialize>(&self, value: &T, event_type: &str) -> Result<Vec<u8>> {
let mut buffer = if let Some(buf) = self.buffer_pool.pop() {
buf
} else {
let hint_size = self.size_hints.get(event_type)
.map(|entry| *entry)
.unwrap_or(1024);
Vec::with_capacity(hint_size)
};
buffer.clear();
let serialized = bincode::serialize(value)?;
buffer.extend_from_slice(&serialized);
self.size_hints.insert(event_type.to_string(), buffer.len());
Ok(buffer)
}
#[inline(always)]
pub fn return_buffer(&self, buffer: Vec<u8>) {
if buffer.capacity() <= 1024 * 1024 { let _ = self.buffer_pool.push(buffer);
}
}
pub fn get_pool_stats(&self) -> (usize, usize) {
(self.buffer_pool.len(), self.buffer_pool.capacity())
}
}
#[cfg(test)]
mod tests {
use super::*;
use fzstream_common::{SerializationProtocol};
use solana_streamer_sdk::streaming::event_parser::common::EventType;
#[tokio::test]
async fn test_lockfree_dispatcher() {
let dispatcher = LockFreeEventDispatcher::new(4, 1000, None);
let test_event = EventMessage {
event_id: "test_1".to_string(),
event_type: EventType::BlockMeta,
data: vec![1, 2, 3, 4],
serialization_format: SerializationProtocol::Bincode,
compression_format: fzstream_common::CompressionLevel::None,
is_compressed: false,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
original_size: Some(4),
grpc_arrival_time: 0,
parsing_time: 0,
completion_time: 0,
client_processing_start: None,
client_processing_end: None,
};
assert!(dispatcher.dispatch_event_ultra_fast("client_1", test_event).is_ok());
let stats = dispatcher.get_performance_stats();
assert_eq!(stats.events_processed, 1);
}
#[test]
fn test_zero_alloc_serializer() {
let serializer = ZeroAllocSerializer::new(10, 1024);
let test_data = "Hello, world!";
let result = serializer.serialize_zero_alloc(&test_data, "string");
assert!(result.is_ok());
let serialized = result.unwrap();
assert!(!serialized.is_empty());
serializer.return_buffer(serialized);
let (available, capacity) = serializer.get_pool_stats();
assert!(available > 0);
assert_eq!(capacity, 10);
}
}