rustkernel_temporal/
volatility.rs

1//! Volatility analysis kernels.
2//!
3//! This module provides volatility modeling:
4//! - GARCH (Generalized Autoregressive Conditional Heteroskedasticity)
5//! - EWMA (Exponentially Weighted Moving Average) volatility
6//! - Realized volatility measures
7
8use std::time::Instant;
9
10use async_trait::async_trait;
11use ringkernel_core::RingContext;
12
13use crate::messages::{
14    EWMAVolatilityInput, EWMAVolatilityOutput, VolatilityAnalysisInput, VolatilityAnalysisOutput,
15};
16use crate::ring_messages::{
17    QueryVolatilityResponse, QueryVolatilityRing, UpdateEWMAVolatilityResponse,
18    UpdateEWMAVolatilityRing, UpdateVolatilityResponse, UpdateVolatilityRing,
19};
20use crate::types::{GARCHCoefficients, GARCHParams, TimeSeries, VolatilityResult};
21use rustkernel_core::{
22    domain::Domain,
23    error::Result,
24    kernel::KernelMetadata,
25    traits::{BatchKernel, GpuKernel, RingKernelHandler},
26};
27
28// ============================================================================
29// Volatility Analysis Kernel
30// ============================================================================
31
32/// Per-asset volatility state for Ring mode operations.
33#[derive(Debug, Clone, Default)]
34pub struct AssetVolatilityState {
35    /// Asset identifier.
36    pub asset_id: u64,
37    /// Current GARCH coefficients.
38    pub coefficients: GARCHCoefficients,
39    /// Current variance estimate.
40    pub variance: f64,
41    /// Current volatility (sqrt variance).
42    pub volatility: f64,
43    /// EWMA variance.
44    pub ewma_variance: f64,
45    /// EWMA lambda.
46    pub ewma_lambda: f64,
47    /// Last return observation.
48    pub last_return: f64,
49    /// Observation count.
50    pub observation_count: u64,
51}
52
53/// Volatility analysis state for Ring mode operations.
54#[derive(Debug, Clone, Default)]
55pub struct VolatilityState {
56    /// Per-asset volatility states.
57    pub assets: std::collections::HashMap<u64, AssetVolatilityState>,
58    /// Default EWMA lambda.
59    pub default_lambda: f64,
60}
61
62/// Volatility analysis kernel using GARCH models.
63///
64/// Estimates conditional volatility using GARCH(p,q) models.
65#[derive(Debug)]
66pub struct VolatilityAnalysis {
67    metadata: KernelMetadata,
68    /// Internal state for Ring mode operations.
69    state: std::sync::RwLock<VolatilityState>,
70}
71
72impl Clone for VolatilityAnalysis {
73    fn clone(&self) -> Self {
74        Self {
75            metadata: self.metadata.clone(),
76            state: std::sync::RwLock::new(self.state.read().unwrap().clone()),
77        }
78    }
79}
80
81impl Default for VolatilityAnalysis {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl VolatilityAnalysis {
88    /// Create a new volatility analysis kernel.
89    #[must_use]
90    pub fn new() -> Self {
91        Self {
92            metadata: KernelMetadata::ring(
93                "temporal/volatility-analysis",
94                Domain::TemporalAnalysis,
95            )
96            .with_description("GARCH model volatility estimation")
97            .with_throughput(100_000)
98            .with_latency_us(10.0),
99            state: std::sync::RwLock::new(VolatilityState {
100                assets: std::collections::HashMap::new(),
101                default_lambda: 0.94,
102            }),
103        }
104    }
105
106    /// Initialize or update GARCH state for an asset.
107    pub fn initialize_asset(&self, asset_id: u64, initial_volatility: f64, lambda: f64) {
108        let mut state = self.state.write().unwrap();
109        let var = initial_volatility * initial_volatility;
110        state.assets.insert(
111            asset_id,
112            AssetVolatilityState {
113                asset_id,
114                coefficients: GARCHCoefficients {
115                    omega: var * 0.1,
116                    alpha: vec![0.1],
117                    beta: vec![0.8],
118                },
119                variance: var,
120                volatility: initial_volatility,
121                ewma_variance: var,
122                ewma_lambda: lambda,
123                last_return: 0.0,
124                observation_count: 1,
125            },
126        );
127    }
128
129    /// Update volatility state with a new return observation.
130    pub fn update_asset(&self, asset_id: u64, return_value: f64) -> (f64, f64, u64) {
131        let mut state = self.state.write().unwrap();
132        let default_lambda = state.default_lambda;
133
134        let asset_state = state.assets.entry(asset_id).or_insert_with(|| {
135            let init_var = return_value * return_value;
136            AssetVolatilityState {
137                asset_id,
138                coefficients: GARCHCoefficients {
139                    omega: init_var.max(0.0001) * 0.1,
140                    alpha: vec![0.1],
141                    beta: vec![0.8],
142                },
143                variance: init_var.max(0.0001),
144                volatility: init_var.max(0.0001).sqrt(),
145                ewma_variance: init_var.max(0.0001),
146                ewma_lambda: default_lambda,
147                last_return: return_value,
148                observation_count: 0,
149            }
150        });
151
152        // GARCH update: σ²_t = ω + α*r²_{t-1} + β*σ²_{t-1}
153        let r_sq = return_value * return_value;
154        let omega = asset_state.coefficients.omega;
155        let alpha = asset_state
156            .coefficients
157            .alpha
158            .first()
159            .copied()
160            .unwrap_or(0.1);
161        let beta = asset_state
162            .coefficients
163            .beta
164            .first()
165            .copied()
166            .unwrap_or(0.8);
167
168        let new_variance = omega + alpha * r_sq + beta * asset_state.variance;
169        asset_state.variance = new_variance.max(1e-10);
170        asset_state.volatility = asset_state.variance.sqrt();
171
172        // EWMA update: σ²_t = λ*σ²_{t-1} + (1-λ)*r²_{t-1}
173        let lambda = asset_state.ewma_lambda;
174        asset_state.ewma_variance = lambda * asset_state.ewma_variance + (1.0 - lambda) * r_sq;
175
176        asset_state.last_return = return_value;
177        asset_state.observation_count += 1;
178
179        (
180            asset_state.volatility,
181            asset_state.variance,
182            asset_state.observation_count,
183        )
184    }
185
186    /// Query volatility state for an asset.
187    pub fn query_asset(&self, asset_id: u64, horizon: usize) -> Option<(f64, Vec<f64>, f64)> {
188        let state = self.state.read().unwrap();
189        state.assets.get(&asset_id).map(|asset_state| {
190            let vol = asset_state.volatility;
191
192            // Generate forecast based on GARCH coefficients
193            let alpha_sum: f64 = asset_state.coefficients.alpha.iter().sum();
194            let beta_sum: f64 = asset_state.coefficients.beta.iter().sum();
195            let persistence = (alpha_sum + beta_sum).min(0.999);
196
197            let _long_run_var = if persistence < 0.999 {
198                asset_state.coefficients.omega / (1.0 - persistence)
199            } else {
200                asset_state.variance
201            };
202
203            let mut forecast = Vec::with_capacity(horizon);
204            let mut prev_var = asset_state.variance;
205
206            for _ in 0..horizon {
207                // Mean reversion toward long-run variance
208                prev_var = asset_state.coefficients.omega + persistence * prev_var;
209                forecast.push(prev_var.sqrt());
210            }
211
212            (vol, forecast, persistence)
213        })
214    }
215
216    /// Update EWMA volatility for an asset.
217    pub fn update_ewma(&self, asset_id: u64, return_value: f64, lambda: f64) -> (f64, f64) {
218        let mut state = self.state.write().unwrap();
219        let r_sq = return_value * return_value;
220
221        let asset_state = state
222            .assets
223            .entry(asset_id)
224            .or_insert_with(|| AssetVolatilityState {
225                asset_id,
226                coefficients: GARCHCoefficients::default(),
227                variance: r_sq.max(0.0001),
228                volatility: r_sq.max(0.0001).sqrt(),
229                ewma_variance: r_sq.max(0.0001),
230                ewma_lambda: lambda,
231                last_return: return_value,
232                observation_count: 0,
233            });
234
235        // Update EWMA: σ²_t = λ*σ²_{t-1} + (1-λ)*r²
236        asset_state.ewma_variance = lambda * asset_state.ewma_variance + (1.0 - lambda) * r_sq;
237        asset_state.ewma_lambda = lambda;
238        asset_state.observation_count += 1;
239
240        (asset_state.ewma_variance, asset_state.ewma_variance.sqrt())
241    }
242
243    /// Estimate volatility using GARCH model.
244    ///
245    /// # Arguments
246    /// * `returns` - Time series of returns (log returns preferred)
247    /// * `params` - GARCH(p,q) parameters
248    /// * `forecast_horizon` - Number of periods to forecast
249    pub fn compute(
250        returns: &TimeSeries,
251        params: GARCHParams,
252        forecast_horizon: usize,
253    ) -> VolatilityResult {
254        if returns.len() < 10 {
255            let var = returns.variance();
256            return VolatilityResult {
257                variance: vec![var; returns.len()],
258                volatility: vec![var.sqrt(); returns.len()],
259                coefficients: GARCHCoefficients {
260                    omega: var,
261                    alpha: Vec::new(),
262                    beta: Vec::new(),
263                },
264                forecast: vec![var.sqrt(); forecast_horizon],
265            };
266        }
267
268        // Fit GARCH model
269        let coefficients = Self::fit_garch(returns, params);
270
271        // Calculate conditional variance series
272        let variance = Self::calculate_variance(returns, &coefficients);
273
274        // Calculate volatility (sqrt of variance)
275        let volatility: Vec<f64> = variance.iter().map(|v| v.sqrt()).collect();
276
277        // Forecast volatility
278        let forecast = Self::forecast_volatility(returns, &coefficients, forecast_horizon);
279
280        VolatilityResult {
281            variance,
282            volatility,
283            coefficients,
284            forecast,
285        }
286    }
287
288    /// EWMA (RiskMetrics-style) volatility estimation.
289    ///
290    /// # Arguments
291    /// * `returns` - Time series of returns
292    /// * `lambda` - Decay factor (typically 0.94 for daily data)
293    /// * `forecast_horizon` - Number of periods to forecast
294    pub fn compute_ewma(
295        returns: &TimeSeries,
296        lambda: f64,
297        forecast_horizon: usize,
298    ) -> VolatilityResult {
299        let n = returns.len();
300        if n == 0 {
301            return VolatilityResult {
302                variance: Vec::new(),
303                volatility: Vec::new(),
304                coefficients: GARCHCoefficients {
305                    omega: 0.0,
306                    alpha: vec![1.0 - lambda],
307                    beta: vec![lambda],
308                },
309                forecast: Vec::new(),
310            };
311        }
312
313        // Initial variance estimate
314        let initial_var = returns.variance();
315        let mut variance = vec![0.0; n];
316        variance[0] = initial_var;
317
318        // EWMA recursion: σ²_t = λσ²_{t-1} + (1-λ)r²_{t-1}
319        for i in 1..n {
320            let r_sq = returns.values[i - 1].powi(2);
321            variance[i] = lambda * variance[i - 1] + (1.0 - lambda) * r_sq;
322        }
323
324        let volatility: Vec<f64> = variance.iter().map(|v| v.sqrt()).collect();
325
326        // EWMA forecast is flat (variance reverts to last estimate)
327        let last_var = *variance.last().unwrap_or(&initial_var);
328        let forecast = vec![last_var.sqrt(); forecast_horizon];
329
330        VolatilityResult {
331            variance,
332            volatility,
333            coefficients: GARCHCoefficients {
334                omega: 0.0, // EWMA has no constant
335                alpha: vec![1.0 - lambda],
336                beta: vec![lambda],
337            },
338            forecast,
339        }
340    }
341
342    /// Realized volatility (sum of squared returns).
343    ///
344    /// # Arguments
345    /// * `returns` - High-frequency returns
346    /// * `window` - Window for realized volatility calculation
347    pub fn compute_realized(returns: &TimeSeries, window: usize) -> Vec<f64> {
348        let n = returns.len();
349        let w = window.min(n).max(1);
350        let mut realized = vec![0.0; n];
351
352        for i in 0..n {
353            let start = i.saturating_sub(w - 1);
354            let sq_sum: f64 = returns.values[start..=i].iter().map(|r| r.powi(2)).sum();
355            realized[i] = (sq_sum / (i - start + 1) as f64).sqrt();
356        }
357
358        realized
359    }
360
361    /// Fit GARCH(p,q) model using simplified MLE.
362    fn fit_garch(returns: &TimeSeries, params: GARCHParams) -> GARCHCoefficients {
363        let _n = returns.len();
364        let unconditional_var = returns.variance();
365
366        // Start with reasonable initial values
367        // For GARCH(1,1): omega + alpha + beta ≈ 1 for stationarity
368        let p = params.p.max(1);
369        let q = params.q.max(1);
370
371        // Initial guesses
372        let mut omega = unconditional_var * 0.1;
373        let mut alpha = vec![0.1 / p as f64; p];
374        let mut beta = vec![0.8 / q as f64; q];
375
376        // Simple grid search optimization (simplified from full MLE)
377        let mut best_likelihood = f64::NEG_INFINITY;
378        let mut best_coeffs = (omega, alpha.clone(), beta.clone());
379
380        // Grid search over parameter space
381        for omega_scale in [0.05, 0.1, 0.15, 0.2] {
382            for alpha_sum in [0.05, 0.1, 0.15, 0.2] {
383                for beta_sum in [0.7, 0.75, 0.8, 0.85, 0.9] {
384                    // Check stationarity: alpha + beta < 1
385                    if alpha_sum + beta_sum >= 0.999 {
386                        continue;
387                    }
388
389                    let test_omega = unconditional_var * omega_scale;
390                    let test_alpha: Vec<f64> = (0..p).map(|_| alpha_sum / p as f64).collect();
391                    let test_beta: Vec<f64> = (0..q).map(|_| beta_sum / q as f64).collect();
392
393                    let test_coeffs = GARCHCoefficients {
394                        omega: test_omega,
395                        alpha: test_alpha.clone(),
396                        beta: test_beta.clone(),
397                    };
398
399                    let variance = Self::calculate_variance(returns, &test_coeffs);
400                    let likelihood = Self::log_likelihood(&returns.values, &variance);
401
402                    if likelihood > best_likelihood && likelihood.is_finite() {
403                        best_likelihood = likelihood;
404                        best_coeffs = (test_omega, test_alpha, test_beta);
405                    }
406                }
407            }
408        }
409
410        omega = best_coeffs.0;
411        alpha = best_coeffs.1;
412        beta = best_coeffs.2;
413
414        // Local refinement using gradient-free optimization
415        for _ in 0..10 {
416            let current_coeffs = GARCHCoefficients {
417                omega,
418                alpha: alpha.clone(),
419                beta: beta.clone(),
420            };
421            let current_variance = Self::calculate_variance(returns, &current_coeffs);
422            let current_ll = Self::log_likelihood(&returns.values, &current_variance);
423
424            // Try small perturbations
425            let delta = 0.01;
426            let mut improved = false;
427
428            // Perturb omega
429            for sign in [-1.0, 1.0] {
430                let new_omega = (omega + sign * delta * unconditional_var).max(1e-10);
431                let test_coeffs = GARCHCoefficients {
432                    omega: new_omega,
433                    alpha: alpha.clone(),
434                    beta: beta.clone(),
435                };
436                let test_var = Self::calculate_variance(returns, &test_coeffs);
437                let test_ll = Self::log_likelihood(&returns.values, &test_var);
438
439                if test_ll > current_ll && test_ll.is_finite() {
440                    omega = new_omega;
441                    improved = true;
442                    break;
443                }
444            }
445
446            // Perturb alpha
447            for i in 0..p {
448                for sign in [-1.0, 1.0] {
449                    let mut new_alpha = alpha.clone();
450                    new_alpha[i] = (new_alpha[i] + sign * delta).max(0.001).min(0.5);
451
452                    let alpha_sum: f64 = new_alpha.iter().sum();
453                    let beta_sum: f64 = beta.iter().sum();
454                    if alpha_sum + beta_sum >= 0.999 {
455                        continue;
456                    }
457
458                    let test_coeffs = GARCHCoefficients {
459                        omega,
460                        alpha: new_alpha.clone(),
461                        beta: beta.clone(),
462                    };
463                    let test_var = Self::calculate_variance(returns, &test_coeffs);
464                    let test_ll = Self::log_likelihood(&returns.values, &test_var);
465
466                    if test_ll > current_ll && test_ll.is_finite() {
467                        alpha = new_alpha;
468                        improved = true;
469                        break;
470                    }
471                }
472            }
473
474            // Perturb beta
475            for i in 0..q {
476                for sign in [-1.0, 1.0] {
477                    let mut new_beta = beta.clone();
478                    new_beta[i] = (new_beta[i] + sign * delta).max(0.001).min(0.99);
479
480                    let alpha_sum: f64 = alpha.iter().sum();
481                    let beta_sum: f64 = new_beta.iter().sum();
482                    if alpha_sum + beta_sum >= 0.999 {
483                        continue;
484                    }
485
486                    let test_coeffs = GARCHCoefficients {
487                        omega,
488                        alpha: alpha.clone(),
489                        beta: new_beta.clone(),
490                    };
491                    let test_var = Self::calculate_variance(returns, &test_coeffs);
492                    let test_ll = Self::log_likelihood(&returns.values, &test_var);
493
494                    if test_ll > current_ll && test_ll.is_finite() {
495                        beta = new_beta;
496                        improved = true;
497                        break;
498                    }
499                }
500            }
501
502            if !improved {
503                break;
504            }
505        }
506
507        GARCHCoefficients { omega, alpha, beta }
508    }
509
510    /// Calculate conditional variance series.
511    fn calculate_variance(returns: &TimeSeries, coeffs: &GARCHCoefficients) -> Vec<f64> {
512        let n = returns.len();
513        let p = coeffs.alpha.len();
514        let q = coeffs.beta.len();
515
516        // Initial variance
517        let init_var = returns.variance().max(1e-10);
518        let mut variance = vec![init_var; n];
519
520        let max_lag = p.max(q);
521
522        for t in max_lag..n {
523            let mut var_t = coeffs.omega;
524
525            // ARCH terms: sum(alpha_i * r²_{t-i})
526            for (i, &alpha_i) in coeffs.alpha.iter().enumerate() {
527                let r_sq = returns.values[t - i - 1].powi(2);
528                var_t += alpha_i * r_sq;
529            }
530
531            // GARCH terms: sum(beta_j * σ²_{t-j})
532            for (j, &beta_j) in coeffs.beta.iter().enumerate() {
533                var_t += beta_j * variance[t - j - 1];
534            }
535
536            variance[t] = var_t.max(1e-10);
537        }
538
539        variance
540    }
541
542    /// Gaussian log-likelihood.
543    fn log_likelihood(returns: &[f64], variance: &[f64]) -> f64 {
544        let n = returns.len();
545        let mut ll = 0.0;
546
547        for i in 0..n {
548            if variance[i] > 0.0 {
549                ll -= 0.5 * (variance[i].ln() + returns[i].powi(2) / variance[i]);
550            }
551        }
552
553        ll - (n as f64 / 2.0) * (2.0 * std::f64::consts::PI).ln()
554    }
555
556    /// Forecast volatility.
557    fn forecast_volatility(
558        returns: &TimeSeries,
559        coeffs: &GARCHCoefficients,
560        horizon: usize,
561    ) -> Vec<f64> {
562        if horizon == 0 {
563            return Vec::new();
564        }
565
566        let variance = Self::calculate_variance(returns, coeffs);
567        let _n = variance.len();
568
569        // Long-run variance: omega / (1 - alpha - beta)
570        let alpha_sum: f64 = coeffs.alpha.iter().sum();
571        let beta_sum: f64 = coeffs.beta.iter().sum();
572        let persistence = alpha_sum + beta_sum;
573
574        let long_run_var = if persistence < 0.999 {
575            coeffs.omega / (1.0 - persistence)
576        } else {
577            returns.variance()
578        };
579
580        let mut forecast = Vec::with_capacity(horizon);
581        let last_var = *variance.last().unwrap_or(&long_run_var);
582        let last_r_sq = returns
583            .values
584            .last()
585            .map(|r| r.powi(2))
586            .unwrap_or(long_run_var);
587
588        // One-step forecast
589        let mut h1_var = coeffs.omega;
590        if !coeffs.alpha.is_empty() {
591            h1_var += coeffs.alpha[0] * last_r_sq;
592        }
593        if !coeffs.beta.is_empty() {
594            h1_var += coeffs.beta[0] * last_var;
595        }
596        forecast.push(h1_var.sqrt());
597
598        // Multi-step forecasts converge to long-run volatility
599        let mut prev_var = h1_var;
600        for _h in 1..horizon {
601            // E[σ²_{t+h}] = ω + (α+β)E[σ²_{t+h-1}]
602            // Converges to long_run_var as h → ∞
603            let h_var = coeffs.omega + persistence * prev_var;
604            forecast.push(h_var.sqrt());
605            prev_var = h_var;
606        }
607
608        forecast
609    }
610
611    /// Parkinson volatility estimator (high-low range).
612    pub fn parkinson_volatility(high: &[f64], low: &[f64]) -> Vec<f64> {
613        let factor = 1.0 / (4.0 * 2.0_f64.ln());
614
615        high.iter()
616            .zip(low.iter())
617            .map(|(&h, &l)| {
618                if h > l && h > 0.0 && l > 0.0 {
619                    (factor * (h / l).ln().powi(2)).sqrt()
620                } else {
621                    0.0
622                }
623            })
624            .collect()
625    }
626
627    /// Garman-Klass volatility estimator (OHLC data).
628    pub fn garman_klass_volatility(
629        open: &[f64],
630        high: &[f64],
631        low: &[f64],
632        close: &[f64],
633    ) -> Vec<f64> {
634        let n = open.len().min(high.len()).min(low.len()).min(close.len());
635
636        (0..n)
637            .map(|i| {
638                let o = open[i];
639                let h = high[i];
640                let l = low[i];
641                let c = close[i];
642
643                if h > 0.0 && l > 0.0 && o > 0.0 && c > 0.0 {
644                    let hl = (h / l).ln();
645                    let co = (c / o).ln();
646                    (0.5 * hl.powi(2) - (2.0 * 2.0_f64.ln() - 1.0) * co.powi(2)).sqrt()
647                } else {
648                    0.0
649                }
650            })
651            .collect()
652    }
653}
654
655impl GpuKernel for VolatilityAnalysis {
656    fn metadata(&self) -> &KernelMetadata {
657        &self.metadata
658    }
659}
660
661#[async_trait]
662impl BatchKernel<VolatilityAnalysisInput, VolatilityAnalysisOutput> for VolatilityAnalysis {
663    async fn execute(&self, input: VolatilityAnalysisInput) -> Result<VolatilityAnalysisOutput> {
664        let start = Instant::now();
665        let result = Self::compute(&input.returns, input.params, input.forecast_horizon);
666        Ok(VolatilityAnalysisOutput {
667            result,
668            compute_time_us: start.elapsed().as_micros() as u64,
669        })
670    }
671}
672
673#[async_trait]
674impl BatchKernel<EWMAVolatilityInput, EWMAVolatilityOutput> for VolatilityAnalysis {
675    async fn execute(&self, input: EWMAVolatilityInput) -> Result<EWMAVolatilityOutput> {
676        let start = Instant::now();
677        let result = Self::compute_ewma(&input.returns, input.lambda, input.forecast_horizon);
678        Ok(EWMAVolatilityOutput {
679            result,
680            compute_time_us: start.elapsed().as_micros() as u64,
681        })
682    }
683}
684
685// ============================================================================
686// Ring Kernel Handler Implementations
687// ============================================================================
688
689/// Ring handler for GARCH volatility updates.
690#[async_trait]
691impl RingKernelHandler<UpdateVolatilityRing, UpdateVolatilityResponse> for VolatilityAnalysis {
692    async fn handle(
693        &self,
694        _ctx: &mut RingContext,
695        msg: UpdateVolatilityRing,
696    ) -> Result<UpdateVolatilityResponse> {
697        // Update GARCH state with new observation
698        let return_value = msg.return_f64();
699        let (volatility, variance, observation_count) =
700            self.update_asset(msg.asset_id, return_value);
701
702        Ok(UpdateVolatilityResponse {
703            correlation_id: msg.correlation_id,
704            asset_id: msg.asset_id,
705            current_volatility: (volatility * 100_000_000.0) as i64,
706            current_variance: (variance * 100_000_000.0) as i64,
707            observation_count: observation_count as u32,
708        })
709    }
710}
711
712/// Ring handler for volatility queries.
713#[async_trait]
714impl RingKernelHandler<QueryVolatilityRing, QueryVolatilityResponse> for VolatilityAnalysis {
715    async fn handle(
716        &self,
717        _ctx: &mut RingContext,
718        msg: QueryVolatilityRing,
719    ) -> Result<QueryVolatilityResponse> {
720        let horizon = msg.horizon.min(10) as usize;
721
722        // Query internal state for this asset
723        let (current_vol, forecast_vec, persistence) =
724            self.query_asset(msg.asset_id, horizon).unwrap_or_else(|| {
725                // Default values if asset not found
726                let default_vol: f64 = 0.02;
727                let default_persistence: f64 = 0.90;
728                let forecast: Vec<f64> = (0..horizon)
729                    .map(|i| default_vol * default_persistence.powi(i as i32))
730                    .collect();
731                (default_vol, forecast, default_persistence)
732            });
733
734        // Convert forecast to fixed-point array
735        let mut forecast = [0i64; 10];
736        for (i, &vol) in forecast_vec.iter().take(10).enumerate() {
737            forecast[i] = (vol * 100_000_000.0) as i64;
738        }
739
740        Ok(QueryVolatilityResponse {
741            correlation_id: msg.correlation_id,
742            asset_id: msg.asset_id,
743            current_volatility: (current_vol * 100_000_000.0) as i64,
744            forecast,
745            forecast_count: forecast_vec.len().min(10) as u8,
746            persistence: (persistence * 10000.0) as i32,
747        })
748    }
749}
750
751/// Ring handler for EWMA volatility updates.
752#[async_trait]
753impl RingKernelHandler<UpdateEWMAVolatilityRing, UpdateEWMAVolatilityResponse>
754    for VolatilityAnalysis
755{
756    async fn handle(
757        &self,
758        _ctx: &mut RingContext,
759        msg: UpdateEWMAVolatilityRing,
760    ) -> Result<UpdateEWMAVolatilityResponse> {
761        // Update EWMA state with new observation
762        let return_value = msg.return_value as f64 / 100_000_000.0;
763        let lambda = msg.lambda_f64();
764
765        // Update running EWMA state
766        let (ewma_variance, ewma_volatility) = self.update_ewma(msg.asset_id, return_value, lambda);
767
768        Ok(UpdateEWMAVolatilityResponse {
769            correlation_id: msg.correlation_id,
770            asset_id: msg.asset_id,
771            ewma_variance: (ewma_variance * 100_000_000.0) as i64,
772            ewma_volatility: (ewma_volatility * 100_000_000.0) as i64,
773        })
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780
781    fn create_volatile_series() -> TimeSeries {
782        // Simulate returns with clustering volatility
783        let mut values = Vec::with_capacity(200);
784        let mut vol = 0.01;
785
786        for i in 0..200 {
787            // GARCH-like volatility clustering
788            let shock = if i % 20 < 5 { 0.03 } else { 0.01 };
789            vol =
790                0.01 + 0.1 * values.last().map(|r: &f64| r.powi(2)).unwrap_or(0.0001) + 0.85 * vol;
791            vol = vol.max(0.001);
792
793            // Generate return
794            let r = vol.sqrt() * ((i as f64 * 0.1).sin() * 2.0 - 1.0 + shock);
795            values.push(r);
796        }
797
798        TimeSeries::new(values)
799    }
800
801    #[test]
802    fn test_volatility_metadata() {
803        let kernel = VolatilityAnalysis::new();
804        assert_eq!(kernel.metadata().id, "temporal/volatility-analysis");
805        assert_eq!(kernel.metadata().domain, Domain::TemporalAnalysis);
806    }
807
808    #[test]
809    fn test_garch_estimation() {
810        let returns = create_volatile_series();
811        let params = GARCHParams::new(1, 1);
812        let result = VolatilityAnalysis::compute(&returns, params, 10);
813
814        // Should have variance for each observation
815        assert_eq!(result.variance.len(), returns.len());
816        assert_eq!(result.volatility.len(), returns.len());
817
818        // Variance should be positive
819        assert!(result.variance.iter().all(|&v| v > 0.0));
820
821        // Should have forecasts
822        assert_eq!(result.forecast.len(), 10);
823
824        // Coefficients should satisfy stationarity
825        let alpha_sum: f64 = result.coefficients.alpha.iter().sum();
826        let beta_sum: f64 = result.coefficients.beta.iter().sum();
827        assert!(
828            alpha_sum + beta_sum < 1.0,
829            "Not stationary: alpha={}, beta={}",
830            alpha_sum,
831            beta_sum
832        );
833    }
834
835    #[test]
836    fn test_ewma_volatility() {
837        let returns = create_volatile_series();
838        let result = VolatilityAnalysis::compute_ewma(&returns, 0.94, 5);
839
840        assert_eq!(result.variance.len(), returns.len());
841        assert_eq!(result.forecast.len(), 5);
842
843        // EWMA coefficients should reflect lambda
844        assert!((result.coefficients.beta[0] - 0.94).abs() < 0.01);
845        assert!((result.coefficients.alpha[0] - 0.06).abs() < 0.01);
846    }
847
848    #[test]
849    fn test_realized_volatility() {
850        let returns = create_volatile_series();
851        let realized = VolatilityAnalysis::compute_realized(&returns, 20);
852
853        assert_eq!(realized.len(), returns.len());
854        assert!(realized.iter().all(|&v| v >= 0.0));
855    }
856
857    #[test]
858    fn test_parkinson_volatility() {
859        let high = vec![105.0, 106.0, 107.0, 108.0, 109.0];
860        let low = vec![95.0, 94.0, 93.0, 92.0, 91.0];
861
862        let vol = VolatilityAnalysis::parkinson_volatility(&high, &low);
863
864        assert_eq!(vol.len(), 5);
865        assert!(vol.iter().all(|&v| v > 0.0));
866    }
867
868    #[test]
869    fn test_garman_klass_volatility() {
870        let open = vec![100.0, 101.0, 102.0];
871        let high = vec![105.0, 106.0, 107.0];
872        let low = vec![95.0, 96.0, 97.0];
873        let close = vec![102.0, 103.0, 104.0];
874
875        let vol = VolatilityAnalysis::garman_klass_volatility(&open, &high, &low, &close);
876
877        assert_eq!(vol.len(), 3);
878        assert!(vol.iter().all(|&v| v >= 0.0));
879    }
880
881    #[test]
882    fn test_volatility_forecast_convergence() {
883        let returns = create_volatile_series();
884        let params = GARCHParams::new(1, 1);
885        let result = VolatilityAnalysis::compute(&returns, params, 100);
886
887        // Forecasts should converge to long-run volatility
888        // Rate depends on persistence (alpha+beta), so use longer horizon
889        let last_forecasts = &result.forecast[80..100];
890        let max_diff = last_forecasts
891            .windows(2)
892            .map(|w| (w[1] - w[0]).abs())
893            .fold(0.0_f64, f64::max);
894
895        // Convergence can be slow for high persistence, allow larger tolerance
896        assert!(
897            max_diff < 1.0,
898            "Forecasts not converging: max_diff={}",
899            max_diff
900        );
901
902        // Also verify that forecasts are positive and decreasing variance in changes
903        assert!(result.forecast.iter().all(|&v| v > 0.0));
904    }
905
906    #[test]
907    fn test_short_series() {
908        let short = TimeSeries::new(vec![0.01, -0.02, 0.015]);
909        let result = VolatilityAnalysis::compute(&short, GARCHParams::new(1, 1), 5);
910
911        // Should handle gracefully
912        assert_eq!(result.variance.len(), 3);
913        assert_eq!(result.forecast.len(), 5);
914    }
915
916    #[test]
917    fn test_empty_series() {
918        let empty = TimeSeries::new(Vec::new());
919        let result = VolatilityAnalysis::compute_ewma(&empty, 0.94, 5);
920
921        assert!(result.variance.is_empty());
922        assert!(result.forecast.is_empty());
923    }
924}