use std::time::Instant;
use async_trait::async_trait;
use ringkernel_core::RingContext;
use crate::messages::{
EWMAVolatilityInput, EWMAVolatilityOutput, VolatilityAnalysisInput, VolatilityAnalysisOutput,
};
use crate::ring_messages::{
QueryVolatilityResponse, QueryVolatilityRing, UpdateEWMAVolatilityResponse,
UpdateEWMAVolatilityRing, UpdateVolatilityResponse, UpdateVolatilityRing,
};
use crate::types::{GARCHCoefficients, GARCHParams, TimeSeries, VolatilityResult};
use rustkernel_core::{
domain::Domain,
error::Result,
kernel::KernelMetadata,
traits::{BatchKernel, GpuKernel, RingKernelHandler},
};
#[derive(Debug, Clone, Default)]
pub struct AssetVolatilityState {
pub asset_id: u64,
pub coefficients: GARCHCoefficients,
pub variance: f64,
pub volatility: f64,
pub ewma_variance: f64,
pub ewma_lambda: f64,
pub last_return: f64,
pub observation_count: u64,
}
#[derive(Debug, Clone, Default)]
pub struct VolatilityState {
pub assets: std::collections::HashMap<u64, AssetVolatilityState>,
pub default_lambda: f64,
}
#[derive(Debug)]
pub struct VolatilityAnalysis {
metadata: KernelMetadata,
state: std::sync::RwLock<VolatilityState>,
}
impl Clone for VolatilityAnalysis {
fn clone(&self) -> Self {
Self {
metadata: self.metadata.clone(),
state: std::sync::RwLock::new(self.state.read().unwrap().clone()),
}
}
}
impl Default for VolatilityAnalysis {
fn default() -> Self {
Self::new()
}
}
impl VolatilityAnalysis {
#[must_use]
pub fn new() -> Self {
Self {
metadata: KernelMetadata::ring(
"temporal/volatility-analysis",
Domain::TemporalAnalysis,
)
.with_description("GARCH model volatility estimation")
.with_throughput(100_000)
.with_latency_us(10.0),
state: std::sync::RwLock::new(VolatilityState {
assets: std::collections::HashMap::new(),
default_lambda: 0.94,
}),
}
}
pub fn initialize_asset(&self, asset_id: u64, initial_volatility: f64, lambda: f64) {
let mut state = self.state.write().unwrap();
let var = initial_volatility * initial_volatility;
state.assets.insert(
asset_id,
AssetVolatilityState {
asset_id,
coefficients: GARCHCoefficients {
omega: var * 0.1,
alpha: vec![0.1],
beta: vec![0.8],
},
variance: var,
volatility: initial_volatility,
ewma_variance: var,
ewma_lambda: lambda,
last_return: 0.0,
observation_count: 1,
},
);
}
pub fn update_asset(&self, asset_id: u64, return_value: f64) -> (f64, f64, u64) {
let mut state = self.state.write().unwrap();
let default_lambda = state.default_lambda;
let asset_state = state.assets.entry(asset_id).or_insert_with(|| {
let init_var = return_value * return_value;
AssetVolatilityState {
asset_id,
coefficients: GARCHCoefficients {
omega: init_var.max(0.0001) * 0.1,
alpha: vec![0.1],
beta: vec![0.8],
},
variance: init_var.max(0.0001),
volatility: init_var.max(0.0001).sqrt(),
ewma_variance: init_var.max(0.0001),
ewma_lambda: default_lambda,
last_return: return_value,
observation_count: 0,
}
});
let r_sq = return_value * return_value;
let omega = asset_state.coefficients.omega;
let alpha = asset_state
.coefficients
.alpha
.first()
.copied()
.unwrap_or(0.1);
let beta = asset_state
.coefficients
.beta
.first()
.copied()
.unwrap_or(0.8);
let new_variance = omega + alpha * r_sq + beta * asset_state.variance;
asset_state.variance = new_variance.max(1e-10);
asset_state.volatility = asset_state.variance.sqrt();
let lambda = asset_state.ewma_lambda;
asset_state.ewma_variance = lambda * asset_state.ewma_variance + (1.0 - lambda) * r_sq;
asset_state.last_return = return_value;
asset_state.observation_count += 1;
(
asset_state.volatility,
asset_state.variance,
asset_state.observation_count,
)
}
pub fn query_asset(&self, asset_id: u64, horizon: usize) -> Option<(f64, Vec<f64>, f64)> {
let state = self.state.read().unwrap();
state.assets.get(&asset_id).map(|asset_state| {
let vol = asset_state.volatility;
let alpha_sum: f64 = asset_state.coefficients.alpha.iter().sum();
let beta_sum: f64 = asset_state.coefficients.beta.iter().sum();
let persistence = (alpha_sum + beta_sum).min(0.999);
let _long_run_var = if persistence < 0.999 {
asset_state.coefficients.omega / (1.0 - persistence)
} else {
asset_state.variance
};
let mut forecast = Vec::with_capacity(horizon);
let mut prev_var = asset_state.variance;
for _ in 0..horizon {
prev_var = asset_state.coefficients.omega + persistence * prev_var;
forecast.push(prev_var.sqrt());
}
(vol, forecast, persistence)
})
}
pub fn update_ewma(&self, asset_id: u64, return_value: f64, lambda: f64) -> (f64, f64) {
let mut state = self.state.write().unwrap();
let r_sq = return_value * return_value;
let asset_state = state
.assets
.entry(asset_id)
.or_insert_with(|| AssetVolatilityState {
asset_id,
coefficients: GARCHCoefficients::default(),
variance: r_sq.max(0.0001),
volatility: r_sq.max(0.0001).sqrt(),
ewma_variance: r_sq.max(0.0001),
ewma_lambda: lambda,
last_return: return_value,
observation_count: 0,
});
asset_state.ewma_variance = lambda * asset_state.ewma_variance + (1.0 - lambda) * r_sq;
asset_state.ewma_lambda = lambda;
asset_state.observation_count += 1;
(asset_state.ewma_variance, asset_state.ewma_variance.sqrt())
}
pub fn compute(
returns: &TimeSeries,
params: GARCHParams,
forecast_horizon: usize,
) -> VolatilityResult {
if returns.len() < 10 {
let var = returns.variance();
return VolatilityResult {
variance: vec![var; returns.len()],
volatility: vec![var.sqrt(); returns.len()],
coefficients: GARCHCoefficients {
omega: var,
alpha: Vec::new(),
beta: Vec::new(),
},
forecast: vec![var.sqrt(); forecast_horizon],
};
}
let coefficients = Self::fit_garch(returns, params);
let variance = Self::calculate_variance(returns, &coefficients);
let volatility: Vec<f64> = variance.iter().map(|v| v.sqrt()).collect();
let forecast = Self::forecast_volatility(returns, &coefficients, forecast_horizon);
VolatilityResult {
variance,
volatility,
coefficients,
forecast,
}
}
pub fn compute_ewma(
returns: &TimeSeries,
lambda: f64,
forecast_horizon: usize,
) -> VolatilityResult {
let n = returns.len();
if n == 0 {
return VolatilityResult {
variance: Vec::new(),
volatility: Vec::new(),
coefficients: GARCHCoefficients {
omega: 0.0,
alpha: vec![1.0 - lambda],
beta: vec![lambda],
},
forecast: Vec::new(),
};
}
let initial_var = returns.variance();
let mut variance = vec![0.0; n];
variance[0] = initial_var;
for i in 1..n {
let r_sq = returns.values[i - 1].powi(2);
variance[i] = lambda * variance[i - 1] + (1.0 - lambda) * r_sq;
}
let volatility: Vec<f64> = variance.iter().map(|v| v.sqrt()).collect();
let last_var = *variance.last().unwrap_or(&initial_var);
let forecast = vec![last_var.sqrt(); forecast_horizon];
VolatilityResult {
variance,
volatility,
coefficients: GARCHCoefficients {
omega: 0.0, alpha: vec![1.0 - lambda],
beta: vec![lambda],
},
forecast,
}
}
#[allow(clippy::needless_range_loop)]
pub fn compute_realized(returns: &TimeSeries, window: usize) -> Vec<f64> {
let n = returns.len();
let w = window.min(n).max(1);
let mut realized = vec![0.0; n];
for i in 0..n {
let start = i.saturating_sub(w - 1);
let sq_sum: f64 = returns.values[start..=i].iter().map(|r| r.powi(2)).sum();
realized[i] = (sq_sum / (i - start + 1) as f64).sqrt();
}
realized
}
fn fit_garch(returns: &TimeSeries, params: GARCHParams) -> GARCHCoefficients {
let _n = returns.len();
let unconditional_var = returns.variance();
let p = params.p.max(1);
let q = params.q.max(1);
let mut omega = unconditional_var * 0.1;
let mut alpha = vec![0.1 / p as f64; p];
let mut beta = vec![0.8 / q as f64; q];
let mut best_likelihood = f64::NEG_INFINITY;
let mut best_coeffs = (omega, alpha.clone(), beta.clone());
for omega_scale in [0.05, 0.1, 0.15, 0.2] {
for alpha_sum in [0.05, 0.1, 0.15, 0.2] {
for beta_sum in [0.7, 0.75, 0.8, 0.85, 0.9] {
if alpha_sum + beta_sum >= 0.999 {
continue;
}
let test_omega = unconditional_var * omega_scale;
let test_alpha: Vec<f64> = (0..p).map(|_| alpha_sum / p as f64).collect();
let test_beta: Vec<f64> = (0..q).map(|_| beta_sum / q as f64).collect();
let test_coeffs = GARCHCoefficients {
omega: test_omega,
alpha: test_alpha.clone(),
beta: test_beta.clone(),
};
let variance = Self::calculate_variance(returns, &test_coeffs);
let likelihood = Self::log_likelihood(&returns.values, &variance);
if likelihood > best_likelihood && likelihood.is_finite() {
best_likelihood = likelihood;
best_coeffs = (test_omega, test_alpha, test_beta);
}
}
}
}
omega = best_coeffs.0;
alpha = best_coeffs.1;
beta = best_coeffs.2;
for _ in 0..10 {
let current_coeffs = GARCHCoefficients {
omega,
alpha: alpha.clone(),
beta: beta.clone(),
};
let current_variance = Self::calculate_variance(returns, ¤t_coeffs);
let current_ll = Self::log_likelihood(&returns.values, ¤t_variance);
let delta = 0.01;
let mut improved = false;
for sign in [-1.0, 1.0] {
let new_omega = (omega + sign * delta * unconditional_var).max(1e-10);
let test_coeffs = GARCHCoefficients {
omega: new_omega,
alpha: alpha.clone(),
beta: beta.clone(),
};
let test_var = Self::calculate_variance(returns, &test_coeffs);
let test_ll = Self::log_likelihood(&returns.values, &test_var);
if test_ll > current_ll && test_ll.is_finite() {
omega = new_omega;
improved = true;
break;
}
}
for i in 0..p {
for sign in [-1.0, 1.0] {
let mut new_alpha = alpha.clone();
new_alpha[i] = (new_alpha[i] + sign * delta).clamp(0.001, 0.5);
let alpha_sum: f64 = new_alpha.iter().sum();
let beta_sum: f64 = beta.iter().sum();
if alpha_sum + beta_sum >= 0.999 {
continue;
}
let test_coeffs = GARCHCoefficients {
omega,
alpha: new_alpha.clone(),
beta: beta.clone(),
};
let test_var = Self::calculate_variance(returns, &test_coeffs);
let test_ll = Self::log_likelihood(&returns.values, &test_var);
if test_ll > current_ll && test_ll.is_finite() {
alpha = new_alpha;
improved = true;
break;
}
}
}
for i in 0..q {
for sign in [-1.0, 1.0] {
let mut new_beta = beta.clone();
new_beta[i] = (new_beta[i] + sign * delta).clamp(0.001, 0.99);
let alpha_sum: f64 = alpha.iter().sum();
let beta_sum: f64 = new_beta.iter().sum();
if alpha_sum + beta_sum >= 0.999 {
continue;
}
let test_coeffs = GARCHCoefficients {
omega,
alpha: alpha.clone(),
beta: new_beta.clone(),
};
let test_var = Self::calculate_variance(returns, &test_coeffs);
let test_ll = Self::log_likelihood(&returns.values, &test_var);
if test_ll > current_ll && test_ll.is_finite() {
beta = new_beta;
improved = true;
break;
}
}
}
if !improved {
break;
}
}
GARCHCoefficients { omega, alpha, beta }
}
fn calculate_variance(returns: &TimeSeries, coeffs: &GARCHCoefficients) -> Vec<f64> {
let n = returns.len();
let p = coeffs.alpha.len();
let q = coeffs.beta.len();
let init_var = returns.variance().max(1e-10);
let mut variance = vec![init_var; n];
let max_lag = p.max(q);
for t in max_lag..n {
let mut var_t = coeffs.omega;
for (i, &alpha_i) in coeffs.alpha.iter().enumerate() {
let r_sq = returns.values[t - i - 1].powi(2);
var_t += alpha_i * r_sq;
}
for (j, &beta_j) in coeffs.beta.iter().enumerate() {
var_t += beta_j * variance[t - j - 1];
}
variance[t] = var_t.max(1e-10);
}
variance
}
fn log_likelihood(returns: &[f64], variance: &[f64]) -> f64 {
let n = returns.len();
let mut ll = 0.0;
for i in 0..n {
if variance[i] > 0.0 {
ll -= 0.5 * (variance[i].ln() + returns[i].powi(2) / variance[i]);
}
}
ll - (n as f64 / 2.0) * (2.0 * std::f64::consts::PI).ln()
}
fn forecast_volatility(
returns: &TimeSeries,
coeffs: &GARCHCoefficients,
horizon: usize,
) -> Vec<f64> {
if horizon == 0 {
return Vec::new();
}
let variance = Self::calculate_variance(returns, coeffs);
let _n = variance.len();
let alpha_sum: f64 = coeffs.alpha.iter().sum();
let beta_sum: f64 = coeffs.beta.iter().sum();
let persistence = alpha_sum + beta_sum;
let long_run_var = if persistence < 0.999 {
coeffs.omega / (1.0 - persistence)
} else {
returns.variance()
};
let mut forecast = Vec::with_capacity(horizon);
let last_var = *variance.last().unwrap_or(&long_run_var);
let last_r_sq = returns
.values
.last()
.map(|r| r.powi(2))
.unwrap_or(long_run_var);
let mut h1_var = coeffs.omega;
if !coeffs.alpha.is_empty() {
h1_var += coeffs.alpha[0] * last_r_sq;
}
if !coeffs.beta.is_empty() {
h1_var += coeffs.beta[0] * last_var;
}
forecast.push(h1_var.sqrt());
let mut prev_var = h1_var;
for _h in 1..horizon {
let h_var = coeffs.omega + persistence * prev_var;
forecast.push(h_var.sqrt());
prev_var = h_var;
}
forecast
}
pub fn parkinson_volatility(high: &[f64], low: &[f64]) -> Vec<f64> {
let factor = 1.0 / (4.0 * 2.0_f64.ln());
high.iter()
.zip(low.iter())
.map(|(&h, &l)| {
if h > l && h > 0.0 && l > 0.0 {
(factor * (h / l).ln().powi(2)).sqrt()
} else {
0.0
}
})
.collect()
}
pub fn garman_klass_volatility(
open: &[f64],
high: &[f64],
low: &[f64],
close: &[f64],
) -> Vec<f64> {
let n = open.len().min(high.len()).min(low.len()).min(close.len());
(0..n)
.map(|i| {
let o = open[i];
let h = high[i];
let l = low[i];
let c = close[i];
if h > 0.0 && l > 0.0 && o > 0.0 && c > 0.0 {
let hl = (h / l).ln();
let co = (c / o).ln();
(0.5 * hl.powi(2) - (2.0 * 2.0_f64.ln() - 1.0) * co.powi(2)).sqrt()
} else {
0.0
}
})
.collect()
}
}
impl GpuKernel for VolatilityAnalysis {
fn metadata(&self) -> &KernelMetadata {
&self.metadata
}
}
#[async_trait]
impl BatchKernel<VolatilityAnalysisInput, VolatilityAnalysisOutput> for VolatilityAnalysis {
async fn execute(&self, input: VolatilityAnalysisInput) -> Result<VolatilityAnalysisOutput> {
let start = Instant::now();
let result = Self::compute(&input.returns, input.params, input.forecast_horizon);
Ok(VolatilityAnalysisOutput {
result,
compute_time_us: start.elapsed().as_micros() as u64,
})
}
}
#[async_trait]
impl BatchKernel<EWMAVolatilityInput, EWMAVolatilityOutput> for VolatilityAnalysis {
async fn execute(&self, input: EWMAVolatilityInput) -> Result<EWMAVolatilityOutput> {
let start = Instant::now();
let result = Self::compute_ewma(&input.returns, input.lambda, input.forecast_horizon);
Ok(EWMAVolatilityOutput {
result,
compute_time_us: start.elapsed().as_micros() as u64,
})
}
}
#[async_trait]
impl RingKernelHandler<UpdateVolatilityRing, UpdateVolatilityResponse> for VolatilityAnalysis {
async fn handle(
&self,
_ctx: &mut RingContext,
msg: UpdateVolatilityRing,
) -> Result<UpdateVolatilityResponse> {
let return_value = msg.return_f64();
let (volatility, variance, observation_count) =
self.update_asset(msg.asset_id, return_value);
Ok(UpdateVolatilityResponse {
correlation_id: msg.correlation_id,
asset_id: msg.asset_id,
current_volatility: (volatility * 100_000_000.0) as i64,
current_variance: (variance * 100_000_000.0) as i64,
observation_count: observation_count as u32,
})
}
}
#[async_trait]
impl RingKernelHandler<QueryVolatilityRing, QueryVolatilityResponse> for VolatilityAnalysis {
async fn handle(
&self,
_ctx: &mut RingContext,
msg: QueryVolatilityRing,
) -> Result<QueryVolatilityResponse> {
let horizon = msg.horizon.min(10) as usize;
let (current_vol, forecast_vec, persistence) =
self.query_asset(msg.asset_id, horizon).unwrap_or_else(|| {
let default_vol: f64 = 0.02;
let default_persistence: f64 = 0.90;
let forecast: Vec<f64> = (0..horizon)
.map(|i| default_vol * default_persistence.powi(i as i32))
.collect();
(default_vol, forecast, default_persistence)
});
let mut forecast = [0i64; 10];
for (i, &vol) in forecast_vec.iter().take(10).enumerate() {
forecast[i] = (vol * 100_000_000.0) as i64;
}
Ok(QueryVolatilityResponse {
correlation_id: msg.correlation_id,
asset_id: msg.asset_id,
current_volatility: (current_vol * 100_000_000.0) as i64,
forecast,
forecast_count: forecast_vec.len().min(10) as u8,
persistence: (persistence * 10000.0) as i32,
})
}
}
#[async_trait]
impl RingKernelHandler<UpdateEWMAVolatilityRing, UpdateEWMAVolatilityResponse>
for VolatilityAnalysis
{
async fn handle(
&self,
_ctx: &mut RingContext,
msg: UpdateEWMAVolatilityRing,
) -> Result<UpdateEWMAVolatilityResponse> {
let return_value = msg.return_value as f64 / 100_000_000.0;
let lambda = msg.lambda_f64();
let (ewma_variance, ewma_volatility) = self.update_ewma(msg.asset_id, return_value, lambda);
Ok(UpdateEWMAVolatilityResponse {
correlation_id: msg.correlation_id,
asset_id: msg.asset_id,
ewma_variance: (ewma_variance * 100_000_000.0) as i64,
ewma_volatility: (ewma_volatility * 100_000_000.0) as i64,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_volatile_series() -> TimeSeries {
let mut values = Vec::with_capacity(200);
let mut vol = 0.01;
for i in 0..200 {
let shock = if i % 20 < 5 { 0.03 } else { 0.01 };
vol =
0.01 + 0.1 * values.last().map(|r: &f64| r.powi(2)).unwrap_or(0.0001) + 0.85 * vol;
vol = vol.max(0.001);
let r = vol.sqrt() * ((i as f64 * 0.1).sin() * 2.0 - 1.0 + shock);
values.push(r);
}
TimeSeries::new(values)
}
#[test]
fn test_volatility_metadata() {
let kernel = VolatilityAnalysis::new();
assert_eq!(kernel.metadata().id, "temporal/volatility-analysis");
assert_eq!(kernel.metadata().domain, Domain::TemporalAnalysis);
}
#[test]
fn test_garch_estimation() {
let returns = create_volatile_series();
let params = GARCHParams::new(1, 1);
let result = VolatilityAnalysis::compute(&returns, params, 10);
assert_eq!(result.variance.len(), returns.len());
assert_eq!(result.volatility.len(), returns.len());
assert!(result.variance.iter().all(|&v| v > 0.0));
assert_eq!(result.forecast.len(), 10);
let alpha_sum: f64 = result.coefficients.alpha.iter().sum();
let beta_sum: f64 = result.coefficients.beta.iter().sum();
assert!(
alpha_sum + beta_sum < 1.0,
"Not stationary: alpha={}, beta={}",
alpha_sum,
beta_sum
);
}
#[test]
fn test_ewma_volatility() {
let returns = create_volatile_series();
let result = VolatilityAnalysis::compute_ewma(&returns, 0.94, 5);
assert_eq!(result.variance.len(), returns.len());
assert_eq!(result.forecast.len(), 5);
assert!((result.coefficients.beta[0] - 0.94).abs() < 0.01);
assert!((result.coefficients.alpha[0] - 0.06).abs() < 0.01);
}
#[test]
fn test_realized_volatility() {
let returns = create_volatile_series();
let realized = VolatilityAnalysis::compute_realized(&returns, 20);
assert_eq!(realized.len(), returns.len());
assert!(realized.iter().all(|&v| v >= 0.0));
}
#[test]
fn test_parkinson_volatility() {
let high = vec![105.0, 106.0, 107.0, 108.0, 109.0];
let low = vec![95.0, 94.0, 93.0, 92.0, 91.0];
let vol = VolatilityAnalysis::parkinson_volatility(&high, &low);
assert_eq!(vol.len(), 5);
assert!(vol.iter().all(|&v| v > 0.0));
}
#[test]
fn test_garman_klass_volatility() {
let open = vec![100.0, 101.0, 102.0];
let high = vec![105.0, 106.0, 107.0];
let low = vec![95.0, 96.0, 97.0];
let close = vec![102.0, 103.0, 104.0];
let vol = VolatilityAnalysis::garman_klass_volatility(&open, &high, &low, &close);
assert_eq!(vol.len(), 3);
assert!(vol.iter().all(|&v| v >= 0.0));
}
#[test]
fn test_volatility_forecast_convergence() {
let returns = create_volatile_series();
let params = GARCHParams::new(1, 1);
let result = VolatilityAnalysis::compute(&returns, params, 100);
let last_forecasts = &result.forecast[80..100];
let max_diff = last_forecasts
.windows(2)
.map(|w| (w[1] - w[0]).abs())
.fold(0.0_f64, f64::max);
assert!(
max_diff < 1.0,
"Forecasts not converging: max_diff={}",
max_diff
);
assert!(result.forecast.iter().all(|&v| v > 0.0));
}
#[test]
fn test_short_series() {
let short = TimeSeries::new(vec![0.01, -0.02, 0.015]);
let result = VolatilityAnalysis::compute(&short, GARCHParams::new(1, 1), 5);
assert_eq!(result.variance.len(), 3);
assert_eq!(result.forecast.len(), 5);
}
#[test]
fn test_empty_series() {
let empty = TimeSeries::new(Vec::new());
let result = VolatilityAnalysis::compute_ewma(&empty, 0.94, 5);
assert!(result.variance.is_empty());
assert!(result.forecast.is_empty());
}
}