rustkernel_temporal/
volatility.rs

1//! Volatility analysis kernels.
2//!
3//! This module provides volatility modeling:
4//! - GARCH (Generalized Autoregressive Conditional Heteroskedasticity)
5//! - EWMA (Exponentially Weighted Moving Average) volatility
6//! - Realized volatility measures
7
8use std::time::Instant;
9
10use async_trait::async_trait;
11use ringkernel_core::RingContext;
12
13use crate::messages::{
14    EWMAVolatilityInput, EWMAVolatilityOutput, VolatilityAnalysisInput, VolatilityAnalysisOutput,
15};
16use crate::ring_messages::{
17    QueryVolatilityResponse, QueryVolatilityRing, UpdateEWMAVolatilityResponse,
18    UpdateEWMAVolatilityRing, UpdateVolatilityResponse, UpdateVolatilityRing,
19};
20use crate::types::{GARCHCoefficients, GARCHParams, TimeSeries, VolatilityResult};
21use rustkernel_core::{
22    domain::Domain,
23    error::Result,
24    kernel::KernelMetadata,
25    traits::{BatchKernel, GpuKernel, RingKernelHandler},
26};
27
28// ============================================================================
29// Volatility Analysis Kernel
30// ============================================================================
31
32/// Per-asset volatility state for Ring mode operations.
33#[derive(Debug, Clone, Default)]
34pub struct AssetVolatilityState {
35    /// Asset identifier.
36    pub asset_id: u64,
37    /// Current GARCH coefficients.
38    pub coefficients: GARCHCoefficients,
39    /// Current variance estimate.
40    pub variance: f64,
41    /// Current volatility (sqrt variance).
42    pub volatility: f64,
43    /// EWMA variance.
44    pub ewma_variance: f64,
45    /// EWMA lambda.
46    pub ewma_lambda: f64,
47    /// Last return observation.
48    pub last_return: f64,
49    /// Observation count.
50    pub observation_count: u64,
51}
52
53/// Volatility analysis state for Ring mode operations.
54#[derive(Debug, Clone, Default)]
55pub struct VolatilityState {
56    /// Per-asset volatility states.
57    pub assets: std::collections::HashMap<u64, AssetVolatilityState>,
58    /// Default EWMA lambda.
59    pub default_lambda: f64,
60}
61
62/// Volatility analysis kernel using GARCH models.
63///
64/// Estimates conditional volatility using GARCH(p,q) models.
65#[derive(Debug)]
66pub struct VolatilityAnalysis {
67    metadata: KernelMetadata,
68    /// Internal state for Ring mode operations.
69    state: std::sync::RwLock<VolatilityState>,
70}
71
72impl Clone for VolatilityAnalysis {
73    fn clone(&self) -> Self {
74        Self {
75            metadata: self.metadata.clone(),
76            state: std::sync::RwLock::new(self.state.read().unwrap().clone()),
77        }
78    }
79}
80
81impl Default for VolatilityAnalysis {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl VolatilityAnalysis {
88    /// Create a new volatility analysis kernel.
89    #[must_use]
90    pub fn new() -> Self {
91        Self {
92            metadata: KernelMetadata::ring(
93                "temporal/volatility-analysis",
94                Domain::TemporalAnalysis,
95            )
96            .with_description("GARCH model volatility estimation")
97            .with_throughput(100_000)
98            .with_latency_us(10.0),
99            state: std::sync::RwLock::new(VolatilityState {
100                assets: std::collections::HashMap::new(),
101                default_lambda: 0.94,
102            }),
103        }
104    }
105
106    /// Initialize or update GARCH state for an asset.
107    pub fn initialize_asset(&self, asset_id: u64, initial_volatility: f64, lambda: f64) {
108        let mut state = self.state.write().unwrap();
109        let var = initial_volatility * initial_volatility;
110        state.assets.insert(
111            asset_id,
112            AssetVolatilityState {
113                asset_id,
114                coefficients: GARCHCoefficients {
115                    omega: var * 0.1,
116                    alpha: vec![0.1],
117                    beta: vec![0.8],
118                },
119                variance: var,
120                volatility: initial_volatility,
121                ewma_variance: var,
122                ewma_lambda: lambda,
123                last_return: 0.0,
124                observation_count: 1,
125            },
126        );
127    }
128
129    /// Update volatility state with a new return observation.
130    pub fn update_asset(&self, asset_id: u64, return_value: f64) -> (f64, f64, u64) {
131        let mut state = self.state.write().unwrap();
132        let default_lambda = state.default_lambda;
133
134        let asset_state = state.assets.entry(asset_id).or_insert_with(|| {
135            let init_var = return_value * return_value;
136            AssetVolatilityState {
137                asset_id,
138                coefficients: GARCHCoefficients {
139                    omega: init_var.max(0.0001) * 0.1,
140                    alpha: vec![0.1],
141                    beta: vec![0.8],
142                },
143                variance: init_var.max(0.0001),
144                volatility: init_var.max(0.0001).sqrt(),
145                ewma_variance: init_var.max(0.0001),
146                ewma_lambda: default_lambda,
147                last_return: return_value,
148                observation_count: 0,
149            }
150        });
151
152        // GARCH update: σ²_t = ω + α*r²_{t-1} + β*σ²_{t-1}
153        let r_sq = return_value * return_value;
154        let omega = asset_state.coefficients.omega;
155        let alpha = asset_state
156            .coefficients
157            .alpha
158            .first()
159            .copied()
160            .unwrap_or(0.1);
161        let beta = asset_state
162            .coefficients
163            .beta
164            .first()
165            .copied()
166            .unwrap_or(0.8);
167
168        let new_variance = omega + alpha * r_sq + beta * asset_state.variance;
169        asset_state.variance = new_variance.max(1e-10);
170        asset_state.volatility = asset_state.variance.sqrt();
171
172        // EWMA update: σ²_t = λ*σ²_{t-1} + (1-λ)*r²_{t-1}
173        let lambda = asset_state.ewma_lambda;
174        asset_state.ewma_variance = lambda * asset_state.ewma_variance + (1.0 - lambda) * r_sq;
175
176        asset_state.last_return = return_value;
177        asset_state.observation_count += 1;
178
179        (
180            asset_state.volatility,
181            asset_state.variance,
182            asset_state.observation_count,
183        )
184    }
185
186    /// Query volatility state for an asset.
187    pub fn query_asset(&self, asset_id: u64, horizon: usize) -> Option<(f64, Vec<f64>, f64)> {
188        let state = self.state.read().unwrap();
189        state.assets.get(&asset_id).map(|asset_state| {
190            let vol = asset_state.volatility;
191
192            // Generate forecast based on GARCH coefficients
193            let alpha_sum: f64 = asset_state.coefficients.alpha.iter().sum();
194            let beta_sum: f64 = asset_state.coefficients.beta.iter().sum();
195            let persistence = (alpha_sum + beta_sum).min(0.999);
196
197            let _long_run_var = if persistence < 0.999 {
198                asset_state.coefficients.omega / (1.0 - persistence)
199            } else {
200                asset_state.variance
201            };
202
203            let mut forecast = Vec::with_capacity(horizon);
204            let mut prev_var = asset_state.variance;
205
206            for _ in 0..horizon {
207                // Mean reversion toward long-run variance
208                prev_var = asset_state.coefficients.omega + persistence * prev_var;
209                forecast.push(prev_var.sqrt());
210            }
211
212            (vol, forecast, persistence)
213        })
214    }
215
216    /// Update EWMA volatility for an asset.
217    pub fn update_ewma(&self, asset_id: u64, return_value: f64, lambda: f64) -> (f64, f64) {
218        let mut state = self.state.write().unwrap();
219        let r_sq = return_value * return_value;
220
221        let asset_state = state
222            .assets
223            .entry(asset_id)
224            .or_insert_with(|| AssetVolatilityState {
225                asset_id,
226                coefficients: GARCHCoefficients::default(),
227                variance: r_sq.max(0.0001),
228                volatility: r_sq.max(0.0001).sqrt(),
229                ewma_variance: r_sq.max(0.0001),
230                ewma_lambda: lambda,
231                last_return: return_value,
232                observation_count: 0,
233            });
234
235        // Update EWMA: σ²_t = λ*σ²_{t-1} + (1-λ)*r²
236        asset_state.ewma_variance = lambda * asset_state.ewma_variance + (1.0 - lambda) * r_sq;
237        asset_state.ewma_lambda = lambda;
238        asset_state.observation_count += 1;
239
240        (asset_state.ewma_variance, asset_state.ewma_variance.sqrt())
241    }
242
243    /// Estimate volatility using GARCH model.
244    ///
245    /// # Arguments
246    /// * `returns` - Time series of returns (log returns preferred)
247    /// * `params` - GARCH(p,q) parameters
248    /// * `forecast_horizon` - Number of periods to forecast
249    pub fn compute(
250        returns: &TimeSeries,
251        params: GARCHParams,
252        forecast_horizon: usize,
253    ) -> VolatilityResult {
254        if returns.len() < 10 {
255            let var = returns.variance();
256            return VolatilityResult {
257                variance: vec![var; returns.len()],
258                volatility: vec![var.sqrt(); returns.len()],
259                coefficients: GARCHCoefficients {
260                    omega: var,
261                    alpha: Vec::new(),
262                    beta: Vec::new(),
263                },
264                forecast: vec![var.sqrt(); forecast_horizon],
265            };
266        }
267
268        // Fit GARCH model
269        let coefficients = Self::fit_garch(returns, params);
270
271        // Calculate conditional variance series
272        let variance = Self::calculate_variance(returns, &coefficients);
273
274        // Calculate volatility (sqrt of variance)
275        let volatility: Vec<f64> = variance.iter().map(|v| v.sqrt()).collect();
276
277        // Forecast volatility
278        let forecast = Self::forecast_volatility(returns, &coefficients, forecast_horizon);
279
280        VolatilityResult {
281            variance,
282            volatility,
283            coefficients,
284            forecast,
285        }
286    }
287
288    /// EWMA (RiskMetrics-style) volatility estimation.
289    ///
290    /// # Arguments
291    /// * `returns` - Time series of returns
292    /// * `lambda` - Decay factor (typically 0.94 for daily data)
293    /// * `forecast_horizon` - Number of periods to forecast
294    pub fn compute_ewma(
295        returns: &TimeSeries,
296        lambda: f64,
297        forecast_horizon: usize,
298    ) -> VolatilityResult {
299        let n = returns.len();
300        if n == 0 {
301            return VolatilityResult {
302                variance: Vec::new(),
303                volatility: Vec::new(),
304                coefficients: GARCHCoefficients {
305                    omega: 0.0,
306                    alpha: vec![1.0 - lambda],
307                    beta: vec![lambda],
308                },
309                forecast: Vec::new(),
310            };
311        }
312
313        // Initial variance estimate
314        let initial_var = returns.variance();
315        let mut variance = vec![0.0; n];
316        variance[0] = initial_var;
317
318        // EWMA recursion: σ²_t = λσ²_{t-1} + (1-λ)r²_{t-1}
319        for i in 1..n {
320            let r_sq = returns.values[i - 1].powi(2);
321            variance[i] = lambda * variance[i - 1] + (1.0 - lambda) * r_sq;
322        }
323
324        let volatility: Vec<f64> = variance.iter().map(|v| v.sqrt()).collect();
325
326        // EWMA forecast is flat (variance reverts to last estimate)
327        let last_var = *variance.last().unwrap_or(&initial_var);
328        let forecast = vec![last_var.sqrt(); forecast_horizon];
329
330        VolatilityResult {
331            variance,
332            volatility,
333            coefficients: GARCHCoefficients {
334                omega: 0.0, // EWMA has no constant
335                alpha: vec![1.0 - lambda],
336                beta: vec![lambda],
337            },
338            forecast,
339        }
340    }
341
342    /// Realized volatility (sum of squared returns).
343    ///
344    /// # Arguments
345    /// * `returns` - High-frequency returns
346    /// * `window` - Window for realized volatility calculation
347    #[allow(clippy::needless_range_loop)]
348    pub fn compute_realized(returns: &TimeSeries, window: usize) -> Vec<f64> {
349        let n = returns.len();
350        let w = window.min(n).max(1);
351        let mut realized = vec![0.0; n];
352
353        for i in 0..n {
354            let start = i.saturating_sub(w - 1);
355            let sq_sum: f64 = returns.values[start..=i].iter().map(|r| r.powi(2)).sum();
356            realized[i] = (sq_sum / (i - start + 1) as f64).sqrt();
357        }
358
359        realized
360    }
361
362    /// Fit GARCH(p,q) model using simplified MLE.
363    fn fit_garch(returns: &TimeSeries, params: GARCHParams) -> GARCHCoefficients {
364        let _n = returns.len();
365        let unconditional_var = returns.variance();
366
367        // Start with reasonable initial values
368        // For GARCH(1,1): omega + alpha + beta ≈ 1 for stationarity
369        let p = params.p.max(1);
370        let q = params.q.max(1);
371
372        // Initial guesses
373        let mut omega = unconditional_var * 0.1;
374        let mut alpha = vec![0.1 / p as f64; p];
375        let mut beta = vec![0.8 / q as f64; q];
376
377        // Simple grid search optimization (simplified from full MLE)
378        let mut best_likelihood = f64::NEG_INFINITY;
379        let mut best_coeffs = (omega, alpha.clone(), beta.clone());
380
381        // Grid search over parameter space
382        for omega_scale in [0.05, 0.1, 0.15, 0.2] {
383            for alpha_sum in [0.05, 0.1, 0.15, 0.2] {
384                for beta_sum in [0.7, 0.75, 0.8, 0.85, 0.9] {
385                    // Check stationarity: alpha + beta < 1
386                    if alpha_sum + beta_sum >= 0.999 {
387                        continue;
388                    }
389
390                    let test_omega = unconditional_var * omega_scale;
391                    let test_alpha: Vec<f64> = (0..p).map(|_| alpha_sum / p as f64).collect();
392                    let test_beta: Vec<f64> = (0..q).map(|_| beta_sum / q as f64).collect();
393
394                    let test_coeffs = GARCHCoefficients {
395                        omega: test_omega,
396                        alpha: test_alpha.clone(),
397                        beta: test_beta.clone(),
398                    };
399
400                    let variance = Self::calculate_variance(returns, &test_coeffs);
401                    let likelihood = Self::log_likelihood(&returns.values, &variance);
402
403                    if likelihood > best_likelihood && likelihood.is_finite() {
404                        best_likelihood = likelihood;
405                        best_coeffs = (test_omega, test_alpha, test_beta);
406                    }
407                }
408            }
409        }
410
411        omega = best_coeffs.0;
412        alpha = best_coeffs.1;
413        beta = best_coeffs.2;
414
415        // Local refinement using gradient-free optimization
416        for _ in 0..10 {
417            let current_coeffs = GARCHCoefficients {
418                omega,
419                alpha: alpha.clone(),
420                beta: beta.clone(),
421            };
422            let current_variance = Self::calculate_variance(returns, &current_coeffs);
423            let current_ll = Self::log_likelihood(&returns.values, &current_variance);
424
425            // Try small perturbations
426            let delta = 0.01;
427            let mut improved = false;
428
429            // Perturb omega
430            for sign in [-1.0, 1.0] {
431                let new_omega = (omega + sign * delta * unconditional_var).max(1e-10);
432                let test_coeffs = GARCHCoefficients {
433                    omega: new_omega,
434                    alpha: alpha.clone(),
435                    beta: beta.clone(),
436                };
437                let test_var = Self::calculate_variance(returns, &test_coeffs);
438                let test_ll = Self::log_likelihood(&returns.values, &test_var);
439
440                if test_ll > current_ll && test_ll.is_finite() {
441                    omega = new_omega;
442                    improved = true;
443                    break;
444                }
445            }
446
447            // Perturb alpha
448            for i in 0..p {
449                for sign in [-1.0, 1.0] {
450                    let mut new_alpha = alpha.clone();
451                    new_alpha[i] = (new_alpha[i] + sign * delta).clamp(0.001, 0.5);
452
453                    let alpha_sum: f64 = new_alpha.iter().sum();
454                    let beta_sum: f64 = beta.iter().sum();
455                    if alpha_sum + beta_sum >= 0.999 {
456                        continue;
457                    }
458
459                    let test_coeffs = GARCHCoefficients {
460                        omega,
461                        alpha: new_alpha.clone(),
462                        beta: beta.clone(),
463                    };
464                    let test_var = Self::calculate_variance(returns, &test_coeffs);
465                    let test_ll = Self::log_likelihood(&returns.values, &test_var);
466
467                    if test_ll > current_ll && test_ll.is_finite() {
468                        alpha = new_alpha;
469                        improved = true;
470                        break;
471                    }
472                }
473            }
474
475            // Perturb beta
476            for i in 0..q {
477                for sign in [-1.0, 1.0] {
478                    let mut new_beta = beta.clone();
479                    new_beta[i] = (new_beta[i] + sign * delta).clamp(0.001, 0.99);
480
481                    let alpha_sum: f64 = alpha.iter().sum();
482                    let beta_sum: f64 = new_beta.iter().sum();
483                    if alpha_sum + beta_sum >= 0.999 {
484                        continue;
485                    }
486
487                    let test_coeffs = GARCHCoefficients {
488                        omega,
489                        alpha: alpha.clone(),
490                        beta: new_beta.clone(),
491                    };
492                    let test_var = Self::calculate_variance(returns, &test_coeffs);
493                    let test_ll = Self::log_likelihood(&returns.values, &test_var);
494
495                    if test_ll > current_ll && test_ll.is_finite() {
496                        beta = new_beta;
497                        improved = true;
498                        break;
499                    }
500                }
501            }
502
503            if !improved {
504                break;
505            }
506        }
507
508        GARCHCoefficients { omega, alpha, beta }
509    }
510
511    /// Calculate conditional variance series.
512    fn calculate_variance(returns: &TimeSeries, coeffs: &GARCHCoefficients) -> Vec<f64> {
513        let n = returns.len();
514        let p = coeffs.alpha.len();
515        let q = coeffs.beta.len();
516
517        // Initial variance
518        let init_var = returns.variance().max(1e-10);
519        let mut variance = vec![init_var; n];
520
521        let max_lag = p.max(q);
522
523        for t in max_lag..n {
524            let mut var_t = coeffs.omega;
525
526            // ARCH terms: sum(alpha_i * r²_{t-i})
527            for (i, &alpha_i) in coeffs.alpha.iter().enumerate() {
528                let r_sq = returns.values[t - i - 1].powi(2);
529                var_t += alpha_i * r_sq;
530            }
531
532            // GARCH terms: sum(beta_j * σ²_{t-j})
533            for (j, &beta_j) in coeffs.beta.iter().enumerate() {
534                var_t += beta_j * variance[t - j - 1];
535            }
536
537            variance[t] = var_t.max(1e-10);
538        }
539
540        variance
541    }
542
543    /// Gaussian log-likelihood.
544    fn log_likelihood(returns: &[f64], variance: &[f64]) -> f64 {
545        let n = returns.len();
546        let mut ll = 0.0;
547
548        for i in 0..n {
549            if variance[i] > 0.0 {
550                ll -= 0.5 * (variance[i].ln() + returns[i].powi(2) / variance[i]);
551            }
552        }
553
554        ll - (n as f64 / 2.0) * (2.0 * std::f64::consts::PI).ln()
555    }
556
557    /// Forecast volatility.
558    fn forecast_volatility(
559        returns: &TimeSeries,
560        coeffs: &GARCHCoefficients,
561        horizon: usize,
562    ) -> Vec<f64> {
563        if horizon == 0 {
564            return Vec::new();
565        }
566
567        let variance = Self::calculate_variance(returns, coeffs);
568        let _n = variance.len();
569
570        // Long-run variance: omega / (1 - alpha - beta)
571        let alpha_sum: f64 = coeffs.alpha.iter().sum();
572        let beta_sum: f64 = coeffs.beta.iter().sum();
573        let persistence = alpha_sum + beta_sum;
574
575        let long_run_var = if persistence < 0.999 {
576            coeffs.omega / (1.0 - persistence)
577        } else {
578            returns.variance()
579        };
580
581        let mut forecast = Vec::with_capacity(horizon);
582        let last_var = *variance.last().unwrap_or(&long_run_var);
583        let last_r_sq = returns
584            .values
585            .last()
586            .map(|r| r.powi(2))
587            .unwrap_or(long_run_var);
588
589        // One-step forecast
590        let mut h1_var = coeffs.omega;
591        if !coeffs.alpha.is_empty() {
592            h1_var += coeffs.alpha[0] * last_r_sq;
593        }
594        if !coeffs.beta.is_empty() {
595            h1_var += coeffs.beta[0] * last_var;
596        }
597        forecast.push(h1_var.sqrt());
598
599        // Multi-step forecasts converge to long-run volatility
600        let mut prev_var = h1_var;
601        for _h in 1..horizon {
602            // E[σ²_{t+h}] = ω + (α+β)E[σ²_{t+h-1}]
603            // Converges to long_run_var as h → ∞
604            let h_var = coeffs.omega + persistence * prev_var;
605            forecast.push(h_var.sqrt());
606            prev_var = h_var;
607        }
608
609        forecast
610    }
611
612    /// Parkinson volatility estimator (high-low range).
613    pub fn parkinson_volatility(high: &[f64], low: &[f64]) -> Vec<f64> {
614        let factor = 1.0 / (4.0 * 2.0_f64.ln());
615
616        high.iter()
617            .zip(low.iter())
618            .map(|(&h, &l)| {
619                if h > l && h > 0.0 && l > 0.0 {
620                    (factor * (h / l).ln().powi(2)).sqrt()
621                } else {
622                    0.0
623                }
624            })
625            .collect()
626    }
627
628    /// Garman-Klass volatility estimator (OHLC data).
629    pub fn garman_klass_volatility(
630        open: &[f64],
631        high: &[f64],
632        low: &[f64],
633        close: &[f64],
634    ) -> Vec<f64> {
635        let n = open.len().min(high.len()).min(low.len()).min(close.len());
636
637        (0..n)
638            .map(|i| {
639                let o = open[i];
640                let h = high[i];
641                let l = low[i];
642                let c = close[i];
643
644                if h > 0.0 && l > 0.0 && o > 0.0 && c > 0.0 {
645                    let hl = (h / l).ln();
646                    let co = (c / o).ln();
647                    (0.5 * hl.powi(2) - (2.0 * 2.0_f64.ln() - 1.0) * co.powi(2)).sqrt()
648                } else {
649                    0.0
650                }
651            })
652            .collect()
653    }
654}
655
656impl GpuKernel for VolatilityAnalysis {
657    fn metadata(&self) -> &KernelMetadata {
658        &self.metadata
659    }
660}
661
662#[async_trait]
663impl BatchKernel<VolatilityAnalysisInput, VolatilityAnalysisOutput> for VolatilityAnalysis {
664    async fn execute(&self, input: VolatilityAnalysisInput) -> Result<VolatilityAnalysisOutput> {
665        let start = Instant::now();
666        let result = Self::compute(&input.returns, input.params, input.forecast_horizon);
667        Ok(VolatilityAnalysisOutput {
668            result,
669            compute_time_us: start.elapsed().as_micros() as u64,
670        })
671    }
672}
673
674#[async_trait]
675impl BatchKernel<EWMAVolatilityInput, EWMAVolatilityOutput> for VolatilityAnalysis {
676    async fn execute(&self, input: EWMAVolatilityInput) -> Result<EWMAVolatilityOutput> {
677        let start = Instant::now();
678        let result = Self::compute_ewma(&input.returns, input.lambda, input.forecast_horizon);
679        Ok(EWMAVolatilityOutput {
680            result,
681            compute_time_us: start.elapsed().as_micros() as u64,
682        })
683    }
684}
685
686// ============================================================================
687// Ring Kernel Handler Implementations
688// ============================================================================
689
690/// Ring handler for GARCH volatility updates.
691#[async_trait]
692impl RingKernelHandler<UpdateVolatilityRing, UpdateVolatilityResponse> for VolatilityAnalysis {
693    async fn handle(
694        &self,
695        _ctx: &mut RingContext,
696        msg: UpdateVolatilityRing,
697    ) -> Result<UpdateVolatilityResponse> {
698        // Update GARCH state with new observation
699        let return_value = msg.return_f64();
700        let (volatility, variance, observation_count) =
701            self.update_asset(msg.asset_id, return_value);
702
703        Ok(UpdateVolatilityResponse {
704            correlation_id: msg.correlation_id,
705            asset_id: msg.asset_id,
706            current_volatility: (volatility * 100_000_000.0) as i64,
707            current_variance: (variance * 100_000_000.0) as i64,
708            observation_count: observation_count as u32,
709        })
710    }
711}
712
713/// Ring handler for volatility queries.
714#[async_trait]
715impl RingKernelHandler<QueryVolatilityRing, QueryVolatilityResponse> for VolatilityAnalysis {
716    async fn handle(
717        &self,
718        _ctx: &mut RingContext,
719        msg: QueryVolatilityRing,
720    ) -> Result<QueryVolatilityResponse> {
721        let horizon = msg.horizon.min(10) as usize;
722
723        // Query internal state for this asset
724        let (current_vol, forecast_vec, persistence) =
725            self.query_asset(msg.asset_id, horizon).unwrap_or_else(|| {
726                // Default values if asset not found
727                let default_vol: f64 = 0.02;
728                let default_persistence: f64 = 0.90;
729                let forecast: Vec<f64> = (0..horizon)
730                    .map(|i| default_vol * default_persistence.powi(i as i32))
731                    .collect();
732                (default_vol, forecast, default_persistence)
733            });
734
735        // Convert forecast to fixed-point array
736        let mut forecast = [0i64; 10];
737        for (i, &vol) in forecast_vec.iter().take(10).enumerate() {
738            forecast[i] = (vol * 100_000_000.0) as i64;
739        }
740
741        Ok(QueryVolatilityResponse {
742            correlation_id: msg.correlation_id,
743            asset_id: msg.asset_id,
744            current_volatility: (current_vol * 100_000_000.0) as i64,
745            forecast,
746            forecast_count: forecast_vec.len().min(10) as u8,
747            persistence: (persistence * 10000.0) as i32,
748        })
749    }
750}
751
752/// Ring handler for EWMA volatility updates.
753#[async_trait]
754impl RingKernelHandler<UpdateEWMAVolatilityRing, UpdateEWMAVolatilityResponse>
755    for VolatilityAnalysis
756{
757    async fn handle(
758        &self,
759        _ctx: &mut RingContext,
760        msg: UpdateEWMAVolatilityRing,
761    ) -> Result<UpdateEWMAVolatilityResponse> {
762        // Update EWMA state with new observation
763        let return_value = msg.return_value as f64 / 100_000_000.0;
764        let lambda = msg.lambda_f64();
765
766        // Update running EWMA state
767        let (ewma_variance, ewma_volatility) = self.update_ewma(msg.asset_id, return_value, lambda);
768
769        Ok(UpdateEWMAVolatilityResponse {
770            correlation_id: msg.correlation_id,
771            asset_id: msg.asset_id,
772            ewma_variance: (ewma_variance * 100_000_000.0) as i64,
773            ewma_volatility: (ewma_volatility * 100_000_000.0) as i64,
774        })
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781
782    fn create_volatile_series() -> TimeSeries {
783        // Simulate returns with clustering volatility
784        let mut values = Vec::with_capacity(200);
785        let mut vol = 0.01;
786
787        for i in 0..200 {
788            // GARCH-like volatility clustering
789            let shock = if i % 20 < 5 { 0.03 } else { 0.01 };
790            vol =
791                0.01 + 0.1 * values.last().map(|r: &f64| r.powi(2)).unwrap_or(0.0001) + 0.85 * vol;
792            vol = vol.max(0.001);
793
794            // Generate return
795            let r = vol.sqrt() * ((i as f64 * 0.1).sin() * 2.0 - 1.0 + shock);
796            values.push(r);
797        }
798
799        TimeSeries::new(values)
800    }
801
802    #[test]
803    fn test_volatility_metadata() {
804        let kernel = VolatilityAnalysis::new();
805        assert_eq!(kernel.metadata().id, "temporal/volatility-analysis");
806        assert_eq!(kernel.metadata().domain, Domain::TemporalAnalysis);
807    }
808
809    #[test]
810    fn test_garch_estimation() {
811        let returns = create_volatile_series();
812        let params = GARCHParams::new(1, 1);
813        let result = VolatilityAnalysis::compute(&returns, params, 10);
814
815        // Should have variance for each observation
816        assert_eq!(result.variance.len(), returns.len());
817        assert_eq!(result.volatility.len(), returns.len());
818
819        // Variance should be positive
820        assert!(result.variance.iter().all(|&v| v > 0.0));
821
822        // Should have forecasts
823        assert_eq!(result.forecast.len(), 10);
824
825        // Coefficients should satisfy stationarity
826        let alpha_sum: f64 = result.coefficients.alpha.iter().sum();
827        let beta_sum: f64 = result.coefficients.beta.iter().sum();
828        assert!(
829            alpha_sum + beta_sum < 1.0,
830            "Not stationary: alpha={}, beta={}",
831            alpha_sum,
832            beta_sum
833        );
834    }
835
836    #[test]
837    fn test_ewma_volatility() {
838        let returns = create_volatile_series();
839        let result = VolatilityAnalysis::compute_ewma(&returns, 0.94, 5);
840
841        assert_eq!(result.variance.len(), returns.len());
842        assert_eq!(result.forecast.len(), 5);
843
844        // EWMA coefficients should reflect lambda
845        assert!((result.coefficients.beta[0] - 0.94).abs() < 0.01);
846        assert!((result.coefficients.alpha[0] - 0.06).abs() < 0.01);
847    }
848
849    #[test]
850    fn test_realized_volatility() {
851        let returns = create_volatile_series();
852        let realized = VolatilityAnalysis::compute_realized(&returns, 20);
853
854        assert_eq!(realized.len(), returns.len());
855        assert!(realized.iter().all(|&v| v >= 0.0));
856    }
857
858    #[test]
859    fn test_parkinson_volatility() {
860        let high = vec![105.0, 106.0, 107.0, 108.0, 109.0];
861        let low = vec![95.0, 94.0, 93.0, 92.0, 91.0];
862
863        let vol = VolatilityAnalysis::parkinson_volatility(&high, &low);
864
865        assert_eq!(vol.len(), 5);
866        assert!(vol.iter().all(|&v| v > 0.0));
867    }
868
869    #[test]
870    fn test_garman_klass_volatility() {
871        let open = vec![100.0, 101.0, 102.0];
872        let high = vec![105.0, 106.0, 107.0];
873        let low = vec![95.0, 96.0, 97.0];
874        let close = vec![102.0, 103.0, 104.0];
875
876        let vol = VolatilityAnalysis::garman_klass_volatility(&open, &high, &low, &close);
877
878        assert_eq!(vol.len(), 3);
879        assert!(vol.iter().all(|&v| v >= 0.0));
880    }
881
882    #[test]
883    fn test_volatility_forecast_convergence() {
884        let returns = create_volatile_series();
885        let params = GARCHParams::new(1, 1);
886        let result = VolatilityAnalysis::compute(&returns, params, 100);
887
888        // Forecasts should converge to long-run volatility
889        // Rate depends on persistence (alpha+beta), so use longer horizon
890        let last_forecasts = &result.forecast[80..100];
891        let max_diff = last_forecasts
892            .windows(2)
893            .map(|w| (w[1] - w[0]).abs())
894            .fold(0.0_f64, f64::max);
895
896        // Convergence can be slow for high persistence, allow larger tolerance
897        assert!(
898            max_diff < 1.0,
899            "Forecasts not converging: max_diff={}",
900            max_diff
901        );
902
903        // Also verify that forecasts are positive and decreasing variance in changes
904        assert!(result.forecast.iter().all(|&v| v > 0.0));
905    }
906
907    #[test]
908    fn test_short_series() {
909        let short = TimeSeries::new(vec![0.01, -0.02, 0.015]);
910        let result = VolatilityAnalysis::compute(&short, GARCHParams::new(1, 1), 5);
911
912        // Should handle gracefully
913        assert_eq!(result.variance.len(), 3);
914        assert_eq!(result.forecast.len(), 5);
915    }
916
917    #[test]
918    fn test_empty_series() {
919        let empty = TimeSeries::new(Vec::new());
920        let result = VolatilityAnalysis::compute_ewma(&empty, 0.94, 5);
921
922        assert!(result.variance.is_empty());
923        assert!(result.forecast.is_empty());
924    }
925}