use rayon::prelude::*;
use scirs2_core::metrics::{Counter, Histogram};
use scirs2_core::ndarray_ext::{Array1, Array2};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use crate::error::{ClusterError, Result};
use crate::raft::OxirsNodeId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum GpuBackend {
Cuda,
Metal,
ParallelCpu,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuConfig {
pub backend: GpuBackend,
pub enable_mixed_precision: bool,
pub batch_size: usize,
pub enable_tensor_cores: bool,
pub memory_limit_bytes: usize,
pub auto_cpu_fallback: bool,
pub warmup_iterations: usize,
}
impl Default for GpuConfig {
fn default() -> Self {
Self {
backend: GpuBackend::ParallelCpu, enable_mixed_precision: true,
batch_size: 256,
enable_tensor_cores: true,
memory_limit_bytes: 2 * 1024 * 1024 * 1024, auto_cpu_fallback: true,
warmup_iterations: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicaMetrics {
pub node_id: OxirsNodeId,
pub latency_ms: f64,
pub connections: f64,
pub lag_ms: f64,
pub cpu_util: f64,
pub mem_util: f64,
pub success_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoadForecastParams {
pub history: Vec<f64>,
pub horizon: usize,
pub confidence_level: f64,
pub detect_seasonality: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoadForecast {
pub predictions: Vec<f64>,
pub upper_bound: Vec<f64>,
pub lower_bound: Vec<f64>,
pub trend_slope: f64,
pub seasonality_period: Option<usize>,
pub accuracy_score: f64,
}
pub struct GpuAcceleratedCluster {
config: GpuConfig,
gpu_operation_counter: Counter,
gpu_latency_histogram: Histogram,
cpu_fallback_counter: Counter,
active_backend: Arc<RwLock<GpuBackend>>,
}
impl GpuAcceleratedCluster {
pub async fn new(config: GpuConfig) -> Result<Self> {
info!(
"Initializing GPU-accelerated cluster with backend: {:?}",
config.backend
);
let active_backend = Self::initialize_gpu(&config).await?;
let instance = Self {
config,
gpu_operation_counter: Counter::new("gpu_operations".to_string()),
gpu_latency_histogram: Histogram::new("gpu_latency_ms".to_string()),
cpu_fallback_counter: Counter::new("cpu_fallback_operations".to_string()),
active_backend: Arc::new(RwLock::new(active_backend)),
};
instance.warmup_gpu_kernels().await?;
info!(
"GPU-accelerated cluster initialized with backend: {:?}",
active_backend
);
Ok(instance)
}
async fn initialize_gpu(config: &GpuConfig) -> Result<GpuBackend> {
match config.backend {
GpuBackend::Cuda => {
#[cfg(feature = "cuda")]
{
use scirs2_core::gpu::{GpuBackend as ScirGpuBackend, GpuContext};
match GpuContext::new(ScirGpuBackend::Cuda) {
Ok(_ctx) => {
info!("CUDA backend initialized successfully");
Ok(GpuBackend::Cuda)
}
Err(e) => {
warn!("Failed to initialize CUDA: {}, using parallel CPU", e);
Ok(GpuBackend::ParallelCpu)
}
}
}
#[cfg(not(feature = "cuda"))]
{
warn!("CUDA support not compiled, using parallel CPU");
Ok(GpuBackend::ParallelCpu)
}
}
GpuBackend::Metal => {
#[cfg(all(feature = "metal", target_os = "macos"))]
{
use scirs2_core::gpu::{GpuBackend as ScirGpuBackend, GpuContext};
match GpuContext::new(ScirGpuBackend::Metal) {
Ok(_ctx) => {
info!("Metal backend initialized successfully");
Ok(GpuBackend::Metal)
}
Err(e) => {
warn!("Failed to initialize Metal: {}, using parallel CPU", e);
Ok(GpuBackend::ParallelCpu)
}
}
}
#[cfg(not(all(feature = "metal", target_os = "macos")))]
{
warn!("Metal support not available on this platform, using parallel CPU");
Ok(GpuBackend::ParallelCpu)
}
}
GpuBackend::ParallelCpu => {
info!("Using high-performance parallel CPU backend");
Ok(GpuBackend::ParallelCpu)
}
}
}
async fn warmup_gpu_kernels(&self) -> Result<()> {
info!(
"Warming up GPU kernels with {} iterations",
self.config.warmup_iterations
);
let dummy_metrics: Vec<ReplicaMetrics> = (0..self.config.batch_size)
.map(|i| ReplicaMetrics {
node_id: i as u64,
latency_ms: 10.0,
connections: 5.0,
lag_ms: 100.0,
cpu_util: 0.5,
mem_util: 0.5,
success_rate: 1.0,
})
.collect();
for i in 0..self.config.warmup_iterations {
let start = Instant::now();
let _ = self.select_best_replica_parallel(&dummy_metrics).await;
let elapsed = start.elapsed();
debug!("Warmup iteration {}: {:?}", i + 1, elapsed);
}
info!("GPU kernel warmup complete");
Ok(())
}
pub async fn select_best_replica(
&self,
replica_metrics: &[ReplicaMetrics],
) -> Result<(OxirsNodeId, f64)> {
self.gpu_operation_counter.inc();
let start = Instant::now();
let result = self.select_best_replica_parallel(replica_metrics).await;
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
self.gpu_latency_histogram.observe(elapsed_ms);
result
}
async fn select_best_replica_parallel(
&self,
replica_metrics: &[ReplicaMetrics],
) -> Result<(OxirsNodeId, f64)> {
if replica_metrics.is_empty() {
return Err(ClusterError::Other("No replicas available".to_string()));
}
let features = self.extract_feature_matrix(replica_metrics);
let weights = [0.25, 0.15, 0.20, 0.15, 0.10, 0.15];
let scores = self.compute_scores_parallel(&features, &weights).await?;
let (best_idx, best_score) = scores
.iter()
.enumerate()
.max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.ok_or_else(|| ClusterError::Other("Failed to find best replica".to_string()))?;
let best_node_id = replica_metrics[best_idx].node_id;
let score_std = Self::compute_std(&scores);
let confidence = if score_std > 0.0 {
(best_score / (best_score + score_std)).min(1.0)
} else {
1.0
};
debug!(
"Parallel replica selection: node={}, score={:.4}, confidence={:.4}",
best_node_id, best_score, confidence
);
Ok((best_node_id, confidence))
}
fn extract_feature_matrix(&self, replica_metrics: &[ReplicaMetrics]) -> Array2<f64> {
let n = replica_metrics.len();
let mut features = Array2::zeros((n, 6));
for (i, metrics) in replica_metrics.iter().enumerate() {
features[[i, 0]] = (metrics.latency_ms / 1000.0).min(1.0); features[[i, 1]] = (metrics.connections / 100.0).min(1.0); features[[i, 2]] = (metrics.lag_ms / 1000.0).min(1.0); features[[i, 3]] = metrics.cpu_util; features[[i, 4]] = metrics.mem_util; features[[i, 5]] = metrics.success_rate; }
features
}
async fn compute_scores_parallel(
&self,
features: &Array2<f64>,
weights: &[f64; 6],
) -> Result<Array1<f64>> {
let n = features.nrows();
let feature_rows: Vec<_> = (0..n).map(|i| features.row(i).to_owned()).collect();
let computed_scores: Vec<f64> = feature_rows
.par_iter()
.map(|row| {
let mut score = 0.0;
for (j, &weight) in weights.iter().enumerate() {
let feature = row[j];
if j == 5 {
score += weight * feature;
} else {
score += weight * (-feature).exp();
}
}
score
})
.collect();
Ok(Array1::from_vec(computed_scores))
}
pub async fn forecast_load(&self, params: LoadForecastParams) -> Result<LoadForecast> {
self.gpu_operation_counter.inc();
let start = Instant::now();
let result = self.forecast_load_parallel(¶ms).await;
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
self.gpu_latency_histogram.observe(elapsed_ms);
result
}
async fn forecast_load_parallel(&self, params: &LoadForecastParams) -> Result<LoadForecast> {
if params.history.is_empty() {
return Err(ClusterError::Other(
"No historical data available".to_string(),
));
}
let history = Array1::from_vec(params.history.clone());
let (trend, seasonal, residual) = self
.decompose_time_series_parallel(&history, params.detect_seasonality)
.await?;
let trend_forecast = self.forecast_trend_parallel(&trend, params.horizon).await?;
let seasonal_forecast = if params.detect_seasonality && !seasonal.is_empty() {
self.forecast_seasonal_parallel(&seasonal, params.horizon)
.await?
} else {
Array1::zeros(params.horizon)
};
let mut predictions = Vec::with_capacity(params.horizon);
for i in 0..params.horizon {
let pred = trend_forecast[i] + seasonal_forecast[i];
predictions.push(pred.max(0.0)); }
let residual_std = Self::compute_std(&residual);
let z_score = Self::inverse_normal_cdf(params.confidence_level);
let mut upper_bound = Vec::with_capacity(params.horizon);
let mut lower_bound = Vec::with_capacity(params.horizon);
for &pred in &predictions {
upper_bound.push(pred + z_score * residual_std);
lower_bound.push((pred - z_score * residual_std).max(0.0));
}
let seasonality_period = if params.detect_seasonality {
self.detect_seasonality_period_parallel(&history).await?
} else {
None
};
let trend_slope = if trend.len() >= 2 {
(trend[trend.len() - 1] - trend[0]) / (trend.len() - 1) as f64
} else {
0.0
};
let mean_val = history.mean().unwrap_or(1.0);
let accuracy_score = if mean_val > 0.0 {
(1.0 - residual_std / mean_val).max(0.0).min(1.0)
} else {
0.5
};
Ok(LoadForecast {
predictions,
upper_bound,
lower_bound,
trend_slope,
seasonality_period,
accuracy_score,
})
}
async fn decompose_time_series_parallel(
&self,
history: &Array1<f64>,
detect_seasonality: bool,
) -> Result<(Array1<f64>, Array1<f64>, Array1<f64>)> {
let n = history.len();
let window = if detect_seasonality { 12 } else { 5 };
let trend = self.moving_average_parallel(history, window).await?;
let detrended = history - &trend;
let (seasonal, residual) = if detect_seasonality && n >= 24 {
let seasonal = self.extract_seasonal_parallel(&detrended, 12).await?;
let residual = &detrended - &seasonal;
(seasonal, residual)
} else {
(Array1::zeros(n), detrended)
};
Ok((trend, seasonal, residual))
}
async fn moving_average_parallel(
&self,
data: &Array1<f64>,
window: usize,
) -> Result<Array1<f64>> {
let n = data.len();
let mut result = Array1::zeros(n);
if window >= n {
let mean = data.mean().unwrap_or(0.0);
result.fill(mean);
return Ok(result);
}
let half_window = window / 2;
let indices: Vec<usize> = (0..n).collect();
let averages: Vec<f64> = indices
.par_iter()
.map(|&i| {
let start = i.saturating_sub(half_window);
let end = (i + half_window + 1).min(n);
let slice = data.slice(s![start..end]);
slice.mean().unwrap_or(0.0)
})
.collect();
for (i, &avg) in averages.iter().enumerate() {
result[i] = avg;
}
Ok(result)
}
async fn extract_seasonal_parallel(
&self,
detrended: &Array1<f64>,
period: usize,
) -> Result<Array1<f64>> {
let n = detrended.len();
let mut seasonal = Array1::zeros(n);
if period >= n {
return Ok(seasonal);
}
let cycles = n / period;
for pos in 0..period {
let mut sum = 0.0;
let mut count = 0;
for cycle in 0..cycles {
let idx = cycle * period + pos;
if idx < n {
sum += detrended[idx];
count += 1;
}
}
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
for cycle in 0..cycles {
let idx = cycle * period + pos;
if idx < n {
seasonal[idx] = avg;
}
}
}
Ok(seasonal)
}
async fn forecast_trend_parallel(
&self,
trend: &Array1<f64>,
horizon: usize,
) -> Result<Array1<f64>> {
let n = trend.len();
let x: Vec<f64> = (0..n).map(|i| i as f64).collect();
let x_mean = x.iter().sum::<f64>() / n as f64;
let y_mean = trend.mean().unwrap_or(0.0);
let mut num = 0.0;
let mut den = 0.0;
for i in 0..n {
let x_diff = x[i] - x_mean;
let y_diff = trend[i] - y_mean;
num += x_diff * y_diff;
den += x_diff * x_diff;
}
let slope = if den != 0.0 { num / den } else { 0.0 };
let intercept = y_mean - slope * x_mean;
let mut forecast = Array1::zeros(horizon);
for i in 0..horizon {
let x_future = (n + i) as f64;
forecast[i] = slope * x_future + intercept;
}
Ok(forecast)
}
async fn forecast_seasonal_parallel(
&self,
seasonal: &Array1<f64>,
horizon: usize,
) -> Result<Array1<f64>> {
let n = seasonal.len();
let mut forecast = Array1::zeros(horizon);
for i in 0..horizon {
forecast[i] = seasonal[i % n];
}
Ok(forecast)
}
async fn detect_seasonality_period_parallel(
&self,
history: &Array1<f64>,
) -> Result<Option<usize>> {
let n = history.len();
if n < 24 {
return Ok(None);
}
let mean = history.mean().unwrap_or(0.0);
let max_lag = 24.min(n / 2);
let mut max_corr = 0.0;
let mut best_lag = None;
for lag in 2..=max_lag {
let mut sum_xy = 0.0;
let mut sum_x2 = 0.0;
let mut sum_y2 = 0.0;
for i in 0..(n - lag) {
let x = history[i] - mean;
let y = history[i + lag] - mean;
sum_xy += x * y;
sum_x2 += x * x;
sum_y2 += y * y;
}
let corr = if sum_x2 > 0.0 && sum_y2 > 0.0 {
sum_xy / (sum_x2 * sum_y2).sqrt()
} else {
0.0
};
if corr > max_corr && corr > 0.5 {
max_corr = corr;
best_lag = Some(lag);
}
}
Ok(best_lag)
}
fn compute_std(data: &Array1<f64>) -> f64 {
if data.is_empty() {
return 0.0;
}
let mean = data.mean().unwrap_or(0.0);
let variance = data.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / data.len() as f64;
variance.sqrt()
}
fn inverse_normal_cdf(p: f64) -> f64 {
if p >= 0.99 {
2.576
} else if p >= 0.95 {
1.96
} else if p >= 0.90 {
1.645
} else if p >= 0.80 {
1.282
} else {
1.0
}
}
pub async fn get_active_backend(&self) -> GpuBackend {
*self.active_backend.read().await
}
pub async fn get_performance_stats(&self) -> GpuPerformanceStats {
let stats = self.gpu_latency_histogram.get_stats();
let (p95, p99) = Self::calculate_percentiles(&stats.buckets, stats.count);
GpuPerformanceStats {
backend: *self.active_backend.read().await,
total_gpu_operations: self.gpu_operation_counter.get(),
cpu_fallback_operations: self.cpu_fallback_counter.get(),
avg_latency_ms: stats.mean,
p95_latency_ms: p95,
p99_latency_ms: p99,
}
}
fn calculate_percentiles(buckets: &[(f64, u64)], total_count: u64) -> (f64, f64) {
if total_count == 0 || buckets.is_empty() {
return (0.0, 0.0);
}
let p95_count = (total_count as f64 * 0.95) as u64;
let p99_count = (total_count as f64 * 0.99) as u64;
let mut cumulative = 0u64;
let mut p95 = 0.0;
let mut p99 = 0.0;
for &(bound, count) in buckets {
cumulative += count;
if cumulative >= p95_count && p95 == 0.0 {
p95 = bound;
}
if cumulative >= p99_count && p99 == 0.0 {
p99 = bound;
}
if p95 > 0.0 && p99 > 0.0 {
break;
}
}
if p95 == 0.0 && !buckets.is_empty() {
p95 = buckets
.last()
.expect("collection validated to be non-empty")
.0;
}
if p99 == 0.0 && !buckets.is_empty() {
p99 = buckets
.last()
.expect("collection validated to be non-empty")
.0;
}
(p95, p99)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuPerformanceStats {
pub backend: GpuBackend,
pub total_gpu_operations: u64,
pub cpu_fallback_operations: u64,
pub avg_latency_ms: f64,
pub p95_latency_ms: f64,
pub p99_latency_ms: f64,
}
use scirs2_core::ndarray_ext::s;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_gpu_cluster_initialization() {
let config = GpuConfig {
backend: GpuBackend::ParallelCpu,
..Default::default()
};
let gpu_cluster = GpuAcceleratedCluster::new(config).await.unwrap();
assert_eq!(
gpu_cluster.get_active_backend().await,
GpuBackend::ParallelCpu
);
}
#[tokio::test]
async fn test_replica_selection() {
let config = GpuConfig {
backend: GpuBackend::ParallelCpu,
warmup_iterations: 1, ..Default::default()
};
let gpu_cluster = GpuAcceleratedCluster::new(config).await.unwrap();
let metrics = vec![
ReplicaMetrics {
node_id: 1,
latency_ms: 10.0,
connections: 5.0,
lag_ms: 100.0,
cpu_util: 0.3,
mem_util: 0.4,
success_rate: 0.99,
},
ReplicaMetrics {
node_id: 2,
latency_ms: 50.0,
connections: 20.0,
lag_ms: 500.0,
cpu_util: 0.8,
mem_util: 0.9,
success_rate: 0.80,
},
ReplicaMetrics {
node_id: 3,
latency_ms: 20.0,
connections: 10.0,
lag_ms: 200.0,
cpu_util: 0.5,
mem_util: 0.6,
success_rate: 0.95,
},
];
let (best_node, confidence) = gpu_cluster.select_best_replica(&metrics).await.unwrap();
assert_eq!(best_node, 1);
assert!(confidence > 0.0 && confidence <= 1.0);
}
#[tokio::test]
async fn test_load_forecasting() {
let config = GpuConfig {
backend: GpuBackend::ParallelCpu,
warmup_iterations: 1,
..Default::default()
};
let gpu_cluster = GpuAcceleratedCluster::new(config).await.unwrap();
let params = LoadForecastParams {
history: vec![10.0, 12.0, 15.0, 13.0, 16.0, 18.0, 20.0, 19.0, 22.0, 24.0],
horizon: 3,
confidence_level: 0.95,
detect_seasonality: false,
};
let forecast = gpu_cluster.forecast_load(params).await.unwrap();
assert_eq!(forecast.predictions.len(), 3);
assert_eq!(forecast.upper_bound.len(), 3);
assert_eq!(forecast.lower_bound.len(), 3);
assert!(forecast.accuracy_score >= 0.0 && forecast.accuracy_score <= 1.0);
}
#[tokio::test]
async fn test_performance_stats() {
let config = GpuConfig {
backend: GpuBackend::ParallelCpu,
warmup_iterations: 1,
..Default::default()
};
let gpu_cluster = GpuAcceleratedCluster::new(config).await.unwrap();
let metrics = vec![ReplicaMetrics {
node_id: 1,
latency_ms: 10.0,
connections: 5.0,
lag_ms: 100.0,
cpu_util: 0.3,
mem_util: 0.4,
success_rate: 0.99,
}];
let _ = gpu_cluster.select_best_replica(&metrics).await;
let stats = gpu_cluster.get_performance_stats().await;
assert!(stats.total_gpu_operations > 0);
}
}