use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use anyhow::Result;
use log::{info, warn};
pub struct RealtimeSystemOptimizer {
config: RealtimeConfig,
optimization_state: Arc<OptimizationState>,
stats: Arc<RealtimeStats>,
initialized: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct RealtimeConfig {
pub enable_realtime_scheduling: bool,
pub realtime_priority: i32,
pub enable_memory_locking: bool,
pub memory_lock_limit: usize,
pub enable_cpu_isolation: bool,
pub isolated_cpu_cores: Vec<usize>,
pub enable_interrupt_isolation: bool,
pub interrupt_cpu_cores: Vec<usize>,
pub enable_numa_optimization: bool,
pub preferred_numa_nodes: Vec<usize>,
pub enable_power_optimization: bool,
pub cpu_frequency_governor: CpuGovernor,
}
#[derive(Debug, Clone)]
pub enum CpuGovernor {
Performance,
OnDemand,
Userspace,
Conservative,
}
impl Default for RealtimeConfig {
fn default() -> Self {
Self {
enable_realtime_scheduling: true,
realtime_priority: 80, enable_memory_locking: true,
memory_lock_limit: 2 * 1024 * 1024 * 1024, enable_cpu_isolation: true,
isolated_cpu_cores: vec![], enable_interrupt_isolation: true,
interrupt_cpu_cores: vec![], enable_numa_optimization: true,
preferred_numa_nodes: vec![],
enable_power_optimization: true,
cpu_frequency_governor: CpuGovernor::Performance,
}
}
}
pub struct OptimizationState {
pub realtime_scheduling_enabled: AtomicBool,
pub memory_locked: AtomicBool,
pub cpu_affinity_set: AtomicBool,
pub interrupt_isolation_enabled: AtomicBool,
pub numa_optimization_enabled: AtomicBool,
pub power_optimization_enabled: AtomicBool,
}
impl Default for OptimizationState {
fn default() -> Self {
Self {
realtime_scheduling_enabled: AtomicBool::new(false),
memory_locked: AtomicBool::new(false),
cpu_affinity_set: AtomicBool::new(false),
interrupt_isolation_enabled: AtomicBool::new(false),
numa_optimization_enabled: AtomicBool::new(false),
power_optimization_enabled: AtomicBool::new(false),
}
}
}
pub struct RealtimeStats {
pub scheduling_latency_ns: AtomicU64,
pub max_scheduling_latency_ns: AtomicU64,
pub page_faults: AtomicU64,
pub context_switches: AtomicU64,
pub interrupts: AtomicU64,
pub system_calls: AtomicU64,
}
impl Default for RealtimeStats {
fn default() -> Self {
Self {
scheduling_latency_ns: AtomicU64::new(0),
max_scheduling_latency_ns: AtomicU64::new(0),
page_faults: AtomicU64::new(0),
context_switches: AtomicU64::new(0),
interrupts: AtomicU64::new(0),
system_calls: AtomicU64::new(0),
}
}
}
impl RealtimeSystemOptimizer {
pub fn new(mut config: RealtimeConfig) -> Result<Self> {
Self::auto_detect_system_config(&mut config)?;
info!("🚀 Creating RealtimeSystemOptimizer with config: {:?}", config);
Ok(Self {
config,
optimization_state: Arc::new(OptimizationState::default()),
stats: Arc::new(RealtimeStats::default()),
initialized: AtomicBool::new(false),
})
}
fn auto_detect_system_config(config: &mut RealtimeConfig) -> Result<()> {
let num_cpus = num_cpus::get();
info!("🧠 Detected {} CPU cores", num_cpus);
if config.isolated_cpu_cores.is_empty() && num_cpus > 4 {
config.isolated_cpu_cores = ((num_cpus - 2)..num_cpus).collect();
info!("🎯 Auto-configured isolated CPU cores: {:?}", config.isolated_cpu_cores);
}
if config.interrupt_cpu_cores.is_empty() && num_cpus > 2 {
config.interrupt_cpu_cores = (0..2).collect();
info!("⚡ Auto-configured interrupt CPU cores: {:?}", config.interrupt_cpu_cores);
}
Self::detect_numa_topology(config)?;
Ok(())
}
#[allow(unused_variables)]
fn detect_numa_topology(config: &mut RealtimeConfig) -> Result<()> {
#[cfg(target_os = "linux")]
{
if let Ok(numa_info) = std::fs::read_to_string("/proc/sys/kernel/numa_balancing") {
if numa_info.trim() == "1" {
info!("🏗️ NUMA balancing detected - will optimize for NUMA");
if config.preferred_numa_nodes.is_empty() {
config.preferred_numa_nodes = vec![0]; }
}
}
}
Ok(())
}
pub async fn apply_all_optimizations(&self) -> Result<()> {
if self.initialized.load(Ordering::Acquire) {
warn!("Real-time optimizations already applied");
return Ok(());
}
info!("🚀 Applying real-time system optimizations...");
if self.config.enable_realtime_scheduling {
self.apply_realtime_scheduling().await?;
}
if self.config.enable_memory_locking {
self.apply_memory_locking().await?;
}
if self.config.enable_cpu_isolation {
self.apply_cpu_isolation().await?;
}
if self.config.enable_interrupt_isolation {
self.apply_interrupt_isolation().await?;
}
if self.config.enable_numa_optimization {
self.apply_numa_optimization().await?;
}
if self.config.enable_power_optimization {
self.apply_power_optimization().await?;
}
self.start_realtime_monitoring().await;
self.initialized.store(true, Ordering::Release);
info!("✅ All real-time optimizations applied successfully");
Ok(())
}
async fn apply_realtime_scheduling(&self) -> Result<()> {
info!("⏰ Applying real-time scheduling optimizations...");
#[cfg(target_os = "linux")]
{
use libc::{sched_setscheduler, sched_param, SCHED_FIFO, SCHED_RR};
let mut param: sched_param = unsafe { std::mem::zeroed() };
param.sched_priority = self.config.realtime_priority;
unsafe {
if sched_setscheduler(0, SCHED_FIFO, ¶m) == 0 {
info!("✅ Real-time FIFO scheduling enabled with priority {}",
self.config.realtime_priority);
self.optimization_state.realtime_scheduling_enabled.store(true, Ordering::Release);
} else {
if sched_setscheduler(0, SCHED_RR, ¶m) == 0 {
info!("✅ Real-time RR scheduling enabled with priority {}",
self.config.realtime_priority);
self.optimization_state.realtime_scheduling_enabled.store(true, Ordering::Release);
} else {
warn!("⚠️ Failed to set real-time scheduling (requires root privileges)");
}
}
}
}
#[cfg(target_os = "macos")]
{
warn!("⚠️ Real-time scheduling not available on macOS");
}
#[cfg(not(unix))]
{
warn!("⚠️ Real-time scheduling optimization not supported on this platform");
}
Ok(())
}
async fn apply_memory_locking(&self) -> Result<()> {
info!("🔒 Applying memory locking optimizations...");
#[cfg(unix)]
{
use libc::{mlockall, MCL_CURRENT, MCL_FUTURE, setrlimit, rlimit, RLIMIT_MEMLOCK};
let rlim = rlimit {
rlim_cur: self.config.memory_lock_limit as u64,
rlim_max: self.config.memory_lock_limit as u64,
};
unsafe {
if setrlimit(RLIMIT_MEMLOCK, &rlim) == 0 {
info!("✅ Memory lock limit set to {} bytes", self.config.memory_lock_limit);
} else {
warn!("⚠️ Failed to set memory lock limit");
}
if mlockall(MCL_CURRENT | MCL_FUTURE) == 0 {
info!("✅ All memory pages locked to prevent swapping");
self.optimization_state.memory_locked.store(true, Ordering::Release);
} else {
warn!("⚠️ Failed to lock memory pages (requires sufficient limits)");
}
}
}
#[cfg(not(unix))]
{
warn!("⚠️ Memory locking optimization not supported on this platform");
}
Ok(())
}
async fn apply_cpu_isolation(&self) -> Result<()> {
info!("🎯 Applying CPU isolation optimizations...");
if self.config.isolated_cpu_cores.is_empty() {
warn!("No isolated CPU cores configured");
return Ok(());
}
#[cfg(target_os = "linux")]
{
use libc::{cpu_set_t, sched_setaffinity, CPU_ZERO, CPU_SET};
use std::mem;
let mut cpu_set: cpu_set_t = unsafe { mem::zeroed() };
unsafe {
CPU_ZERO(&mut cpu_set);
for &core_id in &self.config.isolated_cpu_cores {
if core_id < 256 { CPU_SET(core_id, &mut cpu_set);
}
}
if sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &cpu_set) == 0 {
info!("✅ CPU affinity set to isolated cores: {:?}",
self.config.isolated_cpu_cores);
self.optimization_state.cpu_affinity_set.store(true, Ordering::Release);
} else {
warn!("⚠️ Failed to set CPU affinity");
}
}
}
#[cfg(target_os = "macos")]
{
warn!("⚠️ CPU affinity not available on macOS");
}
#[cfg(not(unix))]
{
warn!("⚠️ CPU isolation optimization not supported on this platform");
}
Ok(())
}
async fn apply_interrupt_isolation(&self) -> Result<()> {
info!("⚡ Applying interrupt isolation optimizations...");
#[cfg(target_os = "linux")]
{
info!("💡 For interrupt isolation, consider:");
info!(" - Using isolcpus=<isolated_cores> kernel parameter");
info!(" - Configuring IRQ affinity via /proc/irq/*/smp_affinity");
info!(" - Using rcu_nocbs=<isolated_cores> for RCU callbacks");
if !self.config.interrupt_cpu_cores.is_empty() {
info!("🎯 Interrupt handling will use cores: {:?}",
self.config.interrupt_cpu_cores);
self.optimization_state.interrupt_isolation_enabled.store(true, Ordering::Release);
}
}
Ok(())
}
async fn apply_numa_optimization(&self) -> Result<()> {
info!("🏗️ Applying NUMA optimizations...");
#[cfg(target_os = "linux")]
{
if !self.config.preferred_numa_nodes.is_empty() {
info!("🎯 Preferred NUMA nodes: {:?}", self.config.preferred_numa_nodes);
info!("💡 For NUMA optimization, consider:");
info!(" - numactl --membind=<nodes> --cpunodebind=<nodes>");
info!(" - Setting vm.zone_reclaim_mode=1");
info!(" - Using NUMA-aware memory allocation");
self.optimization_state.numa_optimization_enabled.store(true, Ordering::Release);
}
}
Ok(())
}
async fn apply_power_optimization(&self) -> Result<()> {
info!("🔋 Applying power management optimizations...");
#[cfg(target_os = "linux")]
{
let governor = match self.config.cpu_frequency_governor {
CpuGovernor::Performance => "performance",
CpuGovernor::OnDemand => "ondemand",
CpuGovernor::Userspace => "userspace",
CpuGovernor::Conservative => "conservative",
};
info!("💡 CPU frequency governor should be set to: {}", governor);
info!(" Execute: echo {} | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor", governor);
info!(" Also consider disabling C-states: intel_idle.max_cstate=0");
self.optimization_state.power_optimization_enabled.store(true, Ordering::Release);
}
Ok(())
}
async fn start_realtime_monitoring(&self) {
let stats = Arc::clone(&self.stats);
let state = Arc::clone(&self.optimization_state);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let start = Instant::now();
thread::yield_now();
let scheduling_latency = start.elapsed().as_nanos() as u64;
stats.scheduling_latency_ns.store(scheduling_latency, Ordering::Relaxed);
let max_latency = stats.max_scheduling_latency_ns.load(Ordering::Relaxed);
if scheduling_latency > max_latency {
stats.max_scheduling_latency_ns.store(scheduling_latency, Ordering::Relaxed);
}
let rt_enabled = state.realtime_scheduling_enabled.load(Ordering::Relaxed);
let mem_locked = state.memory_locked.load(Ordering::Relaxed);
let cpu_affinity = state.cpu_affinity_set.load(Ordering::Relaxed);
if scheduling_latency > 100_000 { warn!("⚠️ High scheduling latency detected: {}μs", scheduling_latency / 1000);
}
static mut COUNTER: u32 = 0;
unsafe {
COUNTER += 1;
if COUNTER % 12 == 0 { info!("📊 Real-time Status:");
info!(" ⏰ RT Scheduling: {}", if rt_enabled { "✅" } else { "❌" });
info!(" 🔒 Memory Locked: {}", if mem_locked { "✅" } else { "❌" });
info!(" 🎯 CPU Affinity: {}", if cpu_affinity { "✅" } else { "❌" });
info!(" 📈 Scheduling Latency: {}ns (max: {}ns)",
scheduling_latency,
stats.max_scheduling_latency_ns.load(Ordering::Relaxed));
}
}
}
});
}
pub fn get_stats(&self) -> RealtimeStatsSnapshot {
RealtimeStatsSnapshot {
scheduling_latency_ns: self.stats.scheduling_latency_ns.load(Ordering::Relaxed),
max_scheduling_latency_ns: self.stats.max_scheduling_latency_ns.load(Ordering::Relaxed),
page_faults: self.stats.page_faults.load(Ordering::Relaxed),
context_switches: self.stats.context_switches.load(Ordering::Relaxed),
interrupts: self.stats.interrupts.load(Ordering::Relaxed),
system_calls: self.stats.system_calls.load(Ordering::Relaxed),
}
}
pub fn get_optimization_status(&self) -> OptimizationStatus {
OptimizationStatus {
realtime_scheduling_enabled: self.optimization_state.realtime_scheduling_enabled.load(Ordering::Relaxed),
memory_locked: self.optimization_state.memory_locked.load(Ordering::Relaxed),
cpu_affinity_set: self.optimization_state.cpu_affinity_set.load(Ordering::Relaxed),
interrupt_isolation_enabled: self.optimization_state.interrupt_isolation_enabled.load(Ordering::Relaxed),
numa_optimization_enabled: self.optimization_state.numa_optimization_enabled.load(Ordering::Relaxed),
power_optimization_enabled: self.optimization_state.power_optimization_enabled.load(Ordering::Relaxed),
}
}
pub fn ultra_low_latency_config() -> RealtimeConfig {
let num_cpus = num_cpus::get();
RealtimeConfig {
enable_realtime_scheduling: true,
realtime_priority: 99, enable_memory_locking: true,
memory_lock_limit: 8 * 1024 * 1024 * 1024, enable_cpu_isolation: true,
isolated_cpu_cores: if num_cpus > 4 {
((num_cpus - 2)..num_cpus).collect()
} else {
vec![]
},
enable_interrupt_isolation: true,
interrupt_cpu_cores: (0..2).collect(),
enable_numa_optimization: true,
preferred_numa_nodes: vec![0],
enable_power_optimization: true,
cpu_frequency_governor: CpuGovernor::Performance,
}
}
}
#[derive(Debug, Clone)]
pub struct RealtimeStatsSnapshot {
pub scheduling_latency_ns: u64,
pub max_scheduling_latency_ns: u64,
pub page_faults: u64,
pub context_switches: u64,
pub interrupts: u64,
pub system_calls: u64,
}
#[derive(Debug, Clone)]
pub struct OptimizationStatus {
pub realtime_scheduling_enabled: bool,
pub memory_locked: bool,
pub cpu_affinity_set: bool,
pub interrupt_isolation_enabled: bool,
pub numa_optimization_enabled: bool,
pub power_optimization_enabled: bool,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_realtime_optimizer_creation() {
let config = RealtimeConfig::default();
let optimizer = RealtimeSystemOptimizer::new(config).unwrap();
let status = optimizer.get_optimization_status();
assert!(!status.realtime_scheduling_enabled); }
#[tokio::test]
async fn test_ultra_low_latency_config() {
let config = RealtimeSystemOptimizer::ultra_low_latency_config();
assert!(config.enable_realtime_scheduling);
assert_eq!(config.realtime_priority, 99);
assert!(config.enable_memory_locking);
assert_eq!(config.memory_lock_limit, 8 * 1024 * 1024 * 1024);
}
#[test]
fn test_stats_snapshot() {
let optimizer = RealtimeSystemOptimizer::new(RealtimeConfig::default()).unwrap();
let stats = optimizer.get_stats();
assert_eq!(stats.scheduling_latency_ns, 0);
assert_eq!(stats.max_scheduling_latency_ns, 0);
assert_eq!(stats.page_faults, 0);
}
}