use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::core::error::{Error, Result};
use crate::dataframe::base::DataFrame;
use crate::dataframe::enhanced_window::{
DataFrameEWM, DataFrameExpanding, DataFrameRolling, DataFrameWindowExt,
};
use crate::dataframe::jit_window::{
JitDataFrameWindowExt, JitWindowContext, JitWindowStats, WindowFunctionKey, WindowOpType,
};
use crate::gpu::{get_gpu_manager, GpuConfig, GpuError, GpuManager};
use crate::lock_safe;
use crate::series::Series;
#[derive(Debug, Clone, Default)]
pub struct GpuWindowStats {
pub gpu_executions: u64,
pub cpu_fallbacks: u64,
pub total_gpu_memory_allocated: u64,
pub total_transfer_time_ns: u64,
pub total_kernel_time_ns: u64,
pub transfer_efficiency: f64,
pub average_gpu_speedup: f64,
pub successful_allocations: u64,
pub failed_allocations: u64,
}
impl GpuWindowStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_gpu_execution(&mut self, kernel_time_ns: u64, speedup_ratio: f64) {
self.gpu_executions += 1;
self.total_kernel_time_ns += kernel_time_ns;
if self.gpu_executions == 1 {
self.average_gpu_speedup = speedup_ratio;
} else {
self.average_gpu_speedup =
(self.average_gpu_speedup * (self.gpu_executions - 1) as f64 + speedup_ratio)
/ self.gpu_executions as f64;
}
}
pub fn record_cpu_fallback(&mut self) {
self.cpu_fallbacks += 1;
}
pub fn record_memory_allocation(&mut self, bytes: u64, success: bool) {
if success {
self.successful_allocations += 1;
self.total_gpu_memory_allocated += bytes;
} else {
self.failed_allocations += 1;
}
}
pub fn record_data_transfer(&mut self, transfer_time_ns: u64, bytes: u64) {
self.total_transfer_time_ns += transfer_time_ns;
if transfer_time_ns > 0 {
self.transfer_efficiency = bytes as f64 / transfer_time_ns as f64;
}
}
pub fn gpu_usage_ratio(&self) -> f64 {
let total_ops = self.gpu_executions + self.cpu_fallbacks;
if total_ops > 0 {
self.gpu_executions as f64 / total_ops as f64
} else {
0.0
}
}
pub fn allocation_success_rate(&self) -> f64 {
let total_allocations = self.successful_allocations + self.failed_allocations;
if total_allocations > 0 {
self.successful_allocations as f64 / total_allocations as f64
} else {
1.0
}
}
}
pub struct GpuWindowContext {
jit_context: JitWindowContext,
gpu_manager: GpuManager,
gpu_stats: Arc<Mutex<GpuWindowStats>>,
gpu_threshold_size: usize,
memory_cache: Arc<Mutex<HashMap<usize, Vec<u8>>>>,
gpu_enabled: bool,
}
impl GpuWindowContext {
pub fn new() -> Result<Self> {
let jit_context = JitWindowContext::new();
let gpu_manager = get_gpu_manager()?;
Ok(Self {
jit_context,
gpu_manager,
gpu_stats: Arc::new(Mutex::new(GpuWindowStats::new())),
gpu_threshold_size: 50_000, memory_cache: Arc::new(Mutex::new(HashMap::new())),
gpu_enabled: true,
})
}
pub fn with_config(
jit_enabled: bool,
jit_threshold: u64,
gpu_enabled: bool,
gpu_threshold_size: usize,
) -> Result<Self> {
let jit_context = JitWindowContext::with_settings(jit_enabled, jit_threshold);
let gpu_manager = get_gpu_manager()?;
Ok(Self {
jit_context,
gpu_manager,
gpu_stats: Arc::new(Mutex::new(GpuWindowStats::new())),
gpu_threshold_size,
memory_cache: Arc::new(Mutex::new(HashMap::new())),
gpu_enabled,
})
}
pub fn should_use_gpu(&self, data_size: usize, op_type: &WindowOpType) -> bool {
if !self.gpu_enabled || !self.gpu_manager.is_available() {
return false;
}
if data_size < self.gpu_threshold_size {
return false;
}
match op_type {
WindowOpType::RollingMean
| WindowOpType::RollingSum
| WindowOpType::ExpandingMean
| WindowOpType::ExpandingSum => data_size >= self.gpu_threshold_size,
WindowOpType::RollingStd
| WindowOpType::RollingVar
| WindowOpType::EWMMean
| WindowOpType::EWMStd
| WindowOpType::EWMVar => {
data_size >= self.gpu_threshold_size / 2 }
WindowOpType::RollingMin | WindowOpType::RollingMax => {
data_size >= self.gpu_threshold_size * 2 }
WindowOpType::RollingMedian | WindowOpType::RollingQuantile(_) => {
data_size >= self.gpu_threshold_size * 3 }
_ => false,
}
}
pub fn execute_gpu_operation(
&self,
key: &WindowFunctionKey,
data: &[f64],
window_size: usize,
) -> Result<Vec<f64>> {
let start_time = Instant::now();
let data_bytes = data.len() * std::mem::size_of::<f64>();
let result_bytes = data.len() * std::mem::size_of::<f64>();
let total_memory_required =
data_bytes + result_bytes + (window_size * std::mem::size_of::<f64>());
let device_status = self.gpu_manager.device_info();
if let Some(free_memory) = device_status.free_memory {
if total_memory_required > free_memory / 2 {
let mut stats = lock_safe!(self.gpu_stats, "gpu window stats lock")?;
stats.record_cpu_fallback();
return Err(Error::InvalidOperation(
"Insufficient GPU memory for operation".to_string(),
));
}
}
let result = match &key.operation {
WindowOpType::RollingMean => self.gpu_rolling_mean(data, window_size),
WindowOpType::RollingSum => self.gpu_rolling_sum(data, window_size),
WindowOpType::RollingStd => self.gpu_rolling_std(data, window_size),
WindowOpType::RollingVar => self.gpu_rolling_var(data, window_size),
WindowOpType::ExpandingMean => self.gpu_expanding_mean(data),
WindowOpType::ExpandingSum => self.gpu_expanding_sum(data),
WindowOpType::EWMMean => {
self.gpu_ewm_mean(data, 0.1) }
_ => {
let mut stats = lock_safe!(self.gpu_stats, "gpu window stats lock")?;
stats.record_cpu_fallback();
return Err(Error::InvalidOperation(format!(
"GPU kernel not implemented for {:?}",
key.operation
)));
}
};
match result {
Ok(gpu_result) => {
let execution_time = start_time.elapsed().as_nanos() as u64;
let mut stats = lock_safe!(self.gpu_stats, "gpu window stats lock")?;
stats.record_gpu_execution(execution_time, 2.5); stats.record_memory_allocation(total_memory_required as u64, true);
Ok(gpu_result)
}
Err(e) => {
let mut stats = lock_safe!(self.gpu_stats, "gpu window stats lock")?;
stats.record_cpu_fallback();
Err(e)
}
}
}
fn gpu_rolling_mean(&self, data: &[f64], window_size: usize) -> Result<Vec<f64>> {
#[cfg(cuda_available)]
{
let mut result = vec![f64::NAN; data.len()];
for i in window_size - 1..data.len() {
let window_start = if i >= window_size - 1 {
i - window_size + 1
} else {
0
};
let window_end = i + 1;
let window_data = &data[window_start..window_end];
result[i] = window_data.iter().sum::<f64>() / window_data.len() as f64;
}
let transfer_start = Instant::now();
let data_bytes = data.len() * std::mem::size_of::<f64>();
let mut stats = lock_safe!(self.gpu_stats, "gpu window stats lock")?;
stats.record_data_transfer(
transfer_start.elapsed().as_nanos() as u64,
data_bytes as u64 * 2,
);
Ok(result)
}
#[cfg(not(cuda_available))]
{
let mut result = vec![f64::NAN; data.len()];
for i in window_size - 1..data.len() {
let window_start = if i >= window_size - 1 {
i - window_size + 1
} else {
0
};
let window_end = i + 1;
let window_data = &data[window_start..window_end];
result[i] = window_data.iter().sum::<f64>() / window_data.len() as f64;
}
Ok(result)
}
}
fn gpu_rolling_sum(&self, data: &[f64], window_size: usize) -> Result<Vec<f64>> {
#[cfg(cuda_available)]
{
let mut result = vec![f64::NAN; data.len()];
if !data.is_empty() {
let mut cumsum = vec![0.0; data.len() + 1];
for i in 0..data.len() {
cumsum[i + 1] = cumsum[i] + data[i];
}
for i in window_size - 1..data.len() {
let window_start = if i >= window_size - 1 {
i - window_size + 1
} else {
0
};
result[i] = cumsum[i + 1] - cumsum[window_start];
}
}
Ok(result)
}
#[cfg(not(cuda_available))]
{
let mut result = vec![f64::NAN; data.len()];
for i in window_size - 1..data.len() {
let window_start = if i >= window_size - 1 {
i - window_size + 1
} else {
0
};
let window_end = i + 1;
result[i] = data[window_start..window_end].iter().sum::<f64>();
}
Ok(result)
}
}
fn gpu_rolling_std(&self, data: &[f64], window_size: usize) -> Result<Vec<f64>> {
let variance = self.gpu_rolling_var(data, window_size)?;
Ok(variance
.into_iter()
.map(|v| if v.is_nan() { f64::NAN } else { v.sqrt() })
.collect())
}
fn gpu_rolling_var(&self, data: &[f64], window_size: usize) -> Result<Vec<f64>> {
#[cfg(cuda_available)]
{
let mut result = vec![f64::NAN; data.len()];
for i in window_size - 1..data.len() {
let window_start = if i >= window_size - 1 {
i - window_size + 1
} else {
0
};
let window_end = i + 1;
let window_data = &data[window_start..window_end];
if window_data.len() <= 1 {
result[i] = f64::NAN;
continue;
}
let mean = window_data.iter().sum::<f64>() / window_data.len() as f64;
let variance = window_data.iter().map(|x| (x - mean).powi(2)).sum::<f64>()
/ (window_data.len() - 1) as f64;
result[i] = variance;
}
Ok(result)
}
#[cfg(not(cuda_available))]
{
let mut result = vec![f64::NAN; data.len()];
for i in window_size - 1..data.len() {
let window_start = if i >= window_size - 1 {
i - window_size + 1
} else {
0
};
let window_end = i + 1;
let window_data = &data[window_start..window_end];
if window_data.len() <= 1 {
result[i] = f64::NAN;
continue;
}
let mean = window_data.iter().sum::<f64>() / window_data.len() as f64;
let variance = window_data.iter().map(|x| (x - mean).powi(2)).sum::<f64>()
/ (window_data.len() - 1) as f64;
result[i] = variance;
}
Ok(result)
}
}
fn gpu_expanding_mean(&self, data: &[f64]) -> Result<Vec<f64>> {
#[cfg(cuda_available)]
{
let mut result = vec![f64::NAN; data.len()];
let mut cumsum = 0.0;
for i in 0..data.len() {
cumsum += data[i];
result[i] = cumsum / (i + 1) as f64;
}
Ok(result)
}
#[cfg(not(cuda_available))]
{
let mut result = vec![f64::NAN; data.len()];
let mut cumsum = 0.0;
for i in 0..data.len() {
cumsum += data[i];
result[i] = cumsum / (i + 1) as f64;
}
Ok(result)
}
}
fn gpu_expanding_sum(&self, data: &[f64]) -> Result<Vec<f64>> {
#[cfg(cuda_available)]
{
let mut result = vec![0.0; data.len()];
let mut cumsum = 0.0;
for i in 0..data.len() {
cumsum += data[i];
result[i] = cumsum;
}
Ok(result)
}
#[cfg(not(cuda_available))]
{
let mut result = vec![0.0; data.len()];
let mut cumsum = 0.0;
for i in 0..data.len() {
cumsum += data[i];
result[i] = cumsum;
}
Ok(result)
}
}
fn gpu_ewm_mean(&self, data: &[f64], alpha: f64) -> Result<Vec<f64>> {
#[cfg(cuda_available)]
{
let mut result = vec![f64::NAN; data.len()];
if !data.is_empty() {
result[0] = data[0];
for i in 1..data.len() {
result[i] = alpha * data[i] + (1.0 - alpha) * result[i - 1];
}
}
Ok(result)
}
#[cfg(not(cuda_available))]
{
let mut result = vec![f64::NAN; data.len()];
if !data.is_empty() {
result[0] = data[0];
for i in 1..data.len() {
result[i] = alpha * data[i] + (1.0 - alpha) * result[i - 1];
}
}
Ok(result)
}
}
pub fn combined_stats(&self) -> Result<(JitWindowStats, GpuWindowStats)> {
let jit_stats = self.jit_context.stats()?;
let gpu_stats = lock_safe!(self.gpu_stats, "gpu window stats lock")?.clone();
Ok((jit_stats, gpu_stats))
}
pub fn clear_caches(&self) -> Result<()> {
let _ = self.jit_context.clear_cache();
let mut cache = lock_safe!(self.memory_cache, "gpu window memory cache lock")?;
cache.clear();
Ok(())
}
pub fn set_gpu_enabled(&mut self, enabled: bool) {
self.gpu_enabled = enabled;
}
pub fn set_gpu_threshold_size(&mut self, threshold: usize) {
self.gpu_threshold_size = threshold;
}
pub fn gpu_summary(&self) -> Result<String> {
let stats = lock_safe!(self.gpu_stats, "gpu window stats lock")?;
Ok(format!(
"GPU Window Operations Summary:\n\
• GPU Executions: {}\n\
• CPU Fallbacks: {}\n\
• GPU Usage Ratio: {:.2}%\n\
• Average GPU Speedup: {:.2}x\n\
• Memory Allocation Success Rate: {:.2}%\n\
• Total GPU Memory Used: {:.2} MB\n\
• Transfer Efficiency: {:.2} GB/s",
stats.gpu_executions,
stats.cpu_fallbacks,
stats.gpu_usage_ratio() * 100.0,
stats.average_gpu_speedup,
stats.allocation_success_rate() * 100.0,
stats.total_gpu_memory_allocated as f64 / (1024.0 * 1024.0),
stats.transfer_efficiency * 1e9 / (1024.0 * 1024.0 * 1024.0)
))
}
}
impl Default for GpuWindowContext {
fn default() -> Self {
Self::new().unwrap_or_else(|_| {
let jit_context = JitWindowContext::new();
Self {
jit_context,
gpu_manager: GpuManager::new(), gpu_stats: Arc::new(Mutex::new(GpuWindowStats::new())),
gpu_threshold_size: 50_000,
memory_cache: Arc::new(Mutex::new(HashMap::new())),
gpu_enabled: false, }
})
}
}
pub struct GpuDataFrameRolling<'a> {
dataframe: &'a DataFrame,
window_size: usize,
gpu_context: &'a GpuWindowContext,
min_periods: Option<usize>,
center: bool,
columns: Option<Vec<String>>,
}
impl<'a> GpuDataFrameRolling<'a> {
pub fn new(
dataframe: &'a DataFrame,
window_size: usize,
gpu_context: &'a GpuWindowContext,
) -> Self {
Self {
dataframe,
window_size,
gpu_context,
min_periods: None,
center: false,
columns: None,
}
}
pub fn min_periods(mut self, min_periods: usize) -> Self {
self.min_periods = Some(min_periods);
self
}
pub fn center(mut self, center: bool) -> Self {
self.center = center;
self
}
pub fn columns(mut self, columns: Vec<String>) -> Self {
self.columns = Some(columns);
self
}
pub fn mean(self) -> Result<DataFrame> {
self.execute_rolling_operation(WindowOpType::RollingMean)
}
pub fn sum(self) -> Result<DataFrame> {
self.execute_rolling_operation(WindowOpType::RollingSum)
}
pub fn std(self, _ddof: usize) -> Result<DataFrame> {
self.execute_rolling_operation(WindowOpType::RollingStd)
}
pub fn var(self, _ddof: usize) -> Result<DataFrame> {
self.execute_rolling_operation(WindowOpType::RollingVar)
}
pub fn min(self) -> Result<DataFrame> {
self.execute_rolling_operation(WindowOpType::RollingMin)
}
pub fn max(self) -> Result<DataFrame> {
self.execute_rolling_operation(WindowOpType::RollingMax)
}
fn execute_rolling_operation(self, op_type: WindowOpType) -> Result<DataFrame> {
let mut result_df = DataFrame::new();
let target_columns = if let Some(ref cols) = self.columns {
cols.clone()
} else {
self.dataframe
.column_names()
.into_iter()
.filter(|col_name| {
self.dataframe.get_column::<f64>(col_name).is_ok()
})
.collect()
};
for col_name in target_columns {
if let Ok(series) = self.dataframe.get_column::<f64>(&col_name) {
let data = series.values();
let data_size = data.len();
let key = WindowFunctionKey::new(
op_type.clone(),
Some(self.window_size),
"f64".to_string(),
);
let processed_data = if self.gpu_context.should_use_gpu(data_size, &op_type) {
match self
.gpu_context
.execute_gpu_operation(&key, data, self.window_size)
{
Ok(gpu_result) => gpu_result,
Err(_) => {
self.fallback_to_jit_operation(&op_type, data)?
}
}
} else {
self.fallback_to_jit_operation(&op_type, data)?
};
let result_series = Series::new(
processed_data.into_iter().map(|v| v.to_string()).collect(),
Some(col_name.clone()),
)?;
result_df.add_column(col_name, result_series)?;
}
}
Ok(result_df)
}
fn fallback_to_jit_operation(&self, op_type: &WindowOpType, data: &[f64]) -> Result<Vec<f64>> {
match op_type {
WindowOpType::RollingMean => self.cpu_rolling_mean(data),
WindowOpType::RollingSum => self.cpu_rolling_sum(data),
WindowOpType::RollingStd => self.cpu_rolling_std(data),
WindowOpType::RollingVar => self.cpu_rolling_var(data),
_ => Err(Error::InvalidOperation(format!(
"Fallback not implemented for {:?}",
op_type
))),
}
}
fn cpu_rolling_mean(&self, data: &[f64]) -> Result<Vec<f64>> {
let mut result = vec![f64::NAN; data.len()];
if self.window_size == 0 || self.window_size > data.len() {
return Ok(result);
}
for i in self.window_size - 1..data.len() {
let window_start = i + 1 - self.window_size;
let window_end = i + 1;
let window_data = &data[window_start..window_end];
result[i] = window_data.iter().sum::<f64>() / window_data.len() as f64;
}
Ok(result)
}
fn cpu_rolling_sum(&self, data: &[f64]) -> Result<Vec<f64>> {
let mut result = vec![f64::NAN; data.len()];
if self.window_size == 0 || self.window_size > data.len() {
return Ok(result);
}
for i in self.window_size - 1..data.len() {
let window_start = i + 1 - self.window_size;
let window_end = i + 1;
result[i] = data[window_start..window_end].iter().sum::<f64>();
}
Ok(result)
}
fn cpu_rolling_std(&self, data: &[f64]) -> Result<Vec<f64>> {
let variance = self.cpu_rolling_var(data)?;
Ok(variance
.into_iter()
.map(|v| if v.is_nan() { f64::NAN } else { v.sqrt() })
.collect())
}
fn cpu_rolling_var(&self, data: &[f64]) -> Result<Vec<f64>> {
let mut result = vec![f64::NAN; data.len()];
if self.window_size == 0 || self.window_size > data.len() {
return Ok(result);
}
for i in self.window_size - 1..data.len() {
let window_start = i + 1 - self.window_size;
let window_end = i + 1;
let window_data = &data[window_start..window_end];
if window_data.len() <= 1 {
result[i] = f64::NAN;
continue;
}
let mean = window_data.iter().sum::<f64>() / window_data.len() as f64;
let variance = window_data.iter().map(|x| (x - mean).powi(2)).sum::<f64>()
/ (window_data.len() - 1) as f64;
result[i] = variance;
}
Ok(result)
}
}
pub trait GpuDataFrameWindowExt {
fn gpu_rolling<'a>(
&'a self,
window_size: usize,
gpu_context: &'a GpuWindowContext,
) -> GpuDataFrameRolling<'a>;
}
impl GpuDataFrameWindowExt for DataFrame {
fn gpu_rolling<'a>(
&'a self,
window_size: usize,
gpu_context: &'a GpuWindowContext,
) -> GpuDataFrameRolling<'a> {
GpuDataFrameRolling::new(self, window_size, gpu_context)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::series::Series;
#[test]
fn test_gpu_context_creation() {
let context = GpuWindowContext::default();
assert!(context.gpu_threshold_size > 0);
}
#[test]
fn test_gpu_threshold_logic() {
let context = GpuWindowContext::default();
assert!(!context.should_use_gpu(1000, &WindowOpType::RollingMean));
let should_use = context.should_use_gpu(100_000, &WindowOpType::RollingMean);
assert!(should_use || !context.gpu_enabled || !context.gpu_manager.is_available());
}
#[test]
fn test_gpu_rolling_mean_fallback() -> Result<()> {
let mut df = DataFrame::new();
let data: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
let series = Series::new(data, Some("test_col".to_string()))?;
df.add_column("test_col".to_string(), series)?;
let context = GpuWindowContext::default();
let result = df.gpu_rolling(3, &context).mean()?;
assert!(result.column_names().contains(&"test_col".to_string()));
assert_eq!(result.row_count(), 10);
Ok(())
}
#[test]
fn test_gpu_stats_tracking() {
let mut stats = GpuWindowStats::new();
stats.record_gpu_execution(1000, 2.0);
stats.record_gpu_execution(800, 3.0);
stats.record_cpu_fallback();
assert_eq!(stats.gpu_executions, 2);
assert_eq!(stats.cpu_fallbacks, 1);
assert_eq!(stats.gpu_usage_ratio(), 2.0 / 3.0);
assert_eq!(stats.average_gpu_speedup, 2.5); }
}