use std::time::Instant;
use async_trait::async_trait;
use crate::messages::{
ARIMAForecastInput, ARIMAForecastOutput, ProphetDecompositionInput, ProphetDecompositionOutput,
};
use crate::types::{ARIMAParams, ARIMAResult, ProphetResult, TimeSeries};
use rustkernel_core::{
domain::Domain,
error::Result,
kernel::KernelMetadata,
traits::{BatchKernel, GpuKernel},
};
#[derive(Debug, Clone)]
pub struct ARIMAForecast {
metadata: KernelMetadata,
}
impl Default for ARIMAForecast {
fn default() -> Self {
Self::new()
}
}
impl ARIMAForecast {
#[must_use]
pub fn new() -> Self {
Self {
metadata: KernelMetadata::batch("temporal/arima-forecast", Domain::TemporalAnalysis)
.with_description("ARIMA model fitting and forecasting")
.with_throughput(10_000)
.with_latency_us(100.0),
}
}
pub fn compute(series: &TimeSeries, params: ARIMAParams, horizon: usize) -> ARIMAResult {
if series.is_empty() {
return ARIMAResult {
ar_coefficients: Vec::new(),
ma_coefficients: Vec::new(),
intercept: 0.0,
fitted: Vec::new(),
residuals: Vec::new(),
forecast: Vec::new(),
aic: f64::INFINITY,
};
}
let mut diff_series = series.values.clone();
for _ in 0..params.d {
diff_series = Self::difference(&diff_series);
}
if diff_series.len() < params.p.max(params.q) + 1 {
return ARIMAResult {
ar_coefficients: vec![0.0; params.p],
ma_coefficients: vec![0.0; params.q],
intercept: series.mean(),
fitted: series.values.clone(),
residuals: vec![0.0; series.len()],
forecast: vec![series.mean(); horizon],
aic: f64::INFINITY,
};
}
let ar_coefficients = if params.p > 0 {
Self::fit_ar(&diff_series, params.p)
} else {
Vec::new()
};
let ar_fitted = Self::apply_ar(&diff_series, &ar_coefficients);
let residuals: Vec<f64> = diff_series
.iter()
.zip(ar_fitted.iter())
.map(|(y, yhat)| y - yhat)
.collect();
let ma_coefficients = if params.q > 0 {
Self::fit_ma(&residuals, params.q)
} else {
Vec::new()
};
let intercept = diff_series.iter().sum::<f64>() / diff_series.len() as f64;
let fitted =
Self::generate_fitted(&diff_series, &ar_coefficients, &ma_coefficients, intercept);
let fitted_integrated = Self::integrate(&fitted, &series.values, params.d);
let final_residuals: Vec<f64> = series
.values
.iter()
.zip(fitted_integrated.iter())
.map(|(y, yhat)| y - yhat)
.collect();
let forecast = Self::forecast_ahead(
&diff_series,
&ar_coefficients,
&ma_coefficients,
intercept,
horizon,
);
let forecast_integrated = Self::integrate_forecast(&forecast, &series.values, params.d);
let n = series.len() as f64;
let k = (params.p + params.q + 1) as f64;
let rss: f64 = final_residuals.iter().map(|r| r.powi(2)).sum();
let aic = n * (rss / n).ln() + 2.0 * k;
ARIMAResult {
ar_coefficients,
ma_coefficients,
intercept,
fitted: fitted_integrated,
residuals: final_residuals,
forecast: forecast_integrated,
aic,
}
}
fn difference(series: &[f64]) -> Vec<f64> {
if series.len() < 2 {
return Vec::new();
}
series.windows(2).map(|w| w[1] - w[0]).collect()
}
fn fit_ar(series: &[f64], p: usize) -> Vec<f64> {
let n = series.len();
if n <= p {
return vec![0.0; p];
}
let mean: f64 = series.iter().sum::<f64>() / n as f64;
let var: f64 = series.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n as f64;
if var < 1e-10 {
return vec![0.0; p];
}
let mut acf = vec![1.0; p + 1];
for k in 1..=p {
let cov: f64 = (0..n - k)
.map(|i| (series[i] - mean) * (series[i + k] - mean))
.sum::<f64>()
/ n as f64;
acf[k] = cov / var;
}
Self::levinson_durbin(&acf, p)
}
fn levinson_durbin(acf: &[f64], p: usize) -> Vec<f64> {
let mut phi = vec![vec![0.0; p + 1]; p + 1];
let mut sigma = vec![0.0; p + 1];
sigma[0] = acf[0];
for k in 1..=p {
let mut num = acf[k];
for j in 1..k {
num -= phi[k - 1][j] * acf[k - j];
}
phi[k][k] = num / sigma[k - 1];
for j in 1..k {
phi[k][j] = phi[k - 1][j] - phi[k][k] * phi[k - 1][k - j];
}
sigma[k] = sigma[k - 1] * (1.0 - phi[k][k].powi(2));
}
(1..=p).map(|j| phi[p][j]).collect()
}
fn apply_ar(series: &[f64], coefficients: &[f64]) -> Vec<f64> {
let n = series.len();
let p = coefficients.len();
let mut fitted = vec![0.0; n];
for i in p..n {
for (j, &coef) in coefficients.iter().enumerate() {
fitted[i] += coef * series[i - j - 1];
}
}
fitted
}
fn fit_ma(residuals: &[f64], q: usize) -> Vec<f64> {
let n = residuals.len();
if n <= q + 10 {
return vec![0.0; q];
}
let var: f64 = residuals.iter().map(|x| x.powi(2)).sum::<f64>() / n as f64;
if var < 1e-10 {
return vec![0.0; q];
}
let mean: f64 = residuals.iter().sum::<f64>() / n as f64;
let centered: Vec<f64> = residuals.iter().map(|&x| x - mean).collect();
let mut ma_coefs: Vec<f64> = Vec::with_capacity(q);
let c0: f64 = centered.iter().map(|x| x.powi(2)).sum::<f64>() / n as f64;
for k in 1..=q {
if c0 > 1e-10 {
let ck: f64 = (0..n - k)
.map(|i| centered[i] * centered[i + k])
.sum::<f64>()
/ n as f64;
ma_coefs.push((ck / c0).clamp(-0.9, 0.9));
} else {
ma_coefs.push(0.0);
}
}
let mut best_sse = Self::calculate_ma_sse(residuals, &ma_coefs);
for _iter in 0..20 {
let mut improved = false;
for j in 0..q {
let original = ma_coefs[j];
let step_size = 0.05 * (1.0 - original.abs()).max(0.1);
for delta in [-step_size, step_size, -step_size * 0.5, step_size * 0.5] {
let new_val = (original + delta).clamp(-0.95, 0.95);
ma_coefs[j] = new_val;
let sum_abs: f64 = ma_coefs.iter().map(|c| c.abs()).sum();
if sum_abs >= 0.99 {
ma_coefs[j] = original;
continue;
}
let new_sse = Self::calculate_ma_sse(residuals, &ma_coefs);
if new_sse < best_sse && new_sse.is_finite() {
best_sse = new_sse;
improved = true;
break;
} else {
ma_coefs[j] = original;
}
}
}
if !improved {
break;
}
}
ma_coefs
}
fn calculate_ma_sse(residuals: &[f64], ma_coefs: &[f64]) -> f64 {
let n = residuals.len();
let q = ma_coefs.len();
let mut innovations = vec![0.0; n];
for t in 0..n {
let mut innovation = residuals[t];
for j in 0..q {
if t > j {
innovation -= ma_coefs[j] * innovations[t - j - 1];
}
}
innovations[t] = innovation;
}
innovations[q..].iter().map(|e| e.powi(2)).sum()
}
fn generate_fitted(
series: &[f64],
ar_coefs: &[f64],
ma_coefs: &[f64],
intercept: f64,
) -> Vec<f64> {
let n = series.len();
let p = ar_coefs.len();
let q = ma_coefs.len();
let start = p.max(q);
let mut fitted = vec![series.iter().sum::<f64>() / n as f64; n];
let mut errors = vec![0.0; n];
for i in start..n {
let mut yhat = intercept;
for (j, &coef) in ar_coefs.iter().enumerate() {
yhat += coef * series[i - j - 1];
}
for (j, &coef) in ma_coefs.iter().enumerate() {
if i > j {
yhat += coef * errors[i - j - 1];
}
}
fitted[i] = yhat;
errors[i] = series[i] - yhat;
}
fitted
}
fn integrate(diff_fitted: &[f64], original: &[f64], d: usize) -> Vec<f64> {
if d == 0 || original.is_empty() {
return diff_fitted.to_vec();
}
let mut result = diff_fitted.to_vec();
for i in 0..d {
let start_val = if i < original.len() { original[i] } else { 0.0 };
let mut integrated = vec![start_val];
for &diff in &result {
integrated.push(integrated.last().unwrap() + diff);
}
result = integrated;
}
result.truncate(original.len());
result
}
fn forecast_ahead(
series: &[f64],
ar_coefs: &[f64],
_ma_coefs: &[f64],
intercept: f64,
horizon: usize,
) -> Vec<f64> {
let _p = ar_coefs.len();
let mut forecasts = Vec::with_capacity(horizon);
let mut extended = series.to_vec();
for _ in 0..horizon {
let mut yhat = intercept;
for (j, &coef) in ar_coefs.iter().enumerate() {
let idx = extended.len().saturating_sub(j + 1);
yhat += coef * extended[idx];
}
forecasts.push(yhat);
extended.push(yhat);
}
forecasts
}
fn integrate_forecast(forecasts: &[f64], original: &[f64], d: usize) -> Vec<f64> {
if d == 0 || original.is_empty() {
return forecasts.to_vec();
}
let mut result = forecasts.to_vec();
let last_val = *original.last().unwrap_or(&0.0);
for _ in 0..d {
let mut integrated = vec![last_val];
for &diff in &result {
integrated.push(integrated.last().unwrap() + diff);
}
result = integrated[1..].to_vec(); }
result
}
}
impl GpuKernel for ARIMAForecast {
fn metadata(&self) -> &KernelMetadata {
&self.metadata
}
}
#[async_trait]
impl BatchKernel<ARIMAForecastInput, ARIMAForecastOutput> for ARIMAForecast {
async fn execute(&self, input: ARIMAForecastInput) -> Result<ARIMAForecastOutput> {
let start = Instant::now();
let result = Self::compute(&input.series, input.params, input.horizon);
Ok(ARIMAForecastOutput {
result,
compute_time_us: start.elapsed().as_micros() as u64,
})
}
}
#[derive(Debug, Clone)]
pub struct ProphetDecomposition {
metadata: KernelMetadata,
}
impl Default for ProphetDecomposition {
fn default() -> Self {
Self::new()
}
}
impl ProphetDecomposition {
#[must_use]
pub fn new() -> Self {
Self {
metadata: KernelMetadata::batch(
"temporal/prophet-decomposition",
Domain::TemporalAnalysis,
)
.with_description("Prophet-style trend/seasonal decomposition")
.with_throughput(5_000)
.with_latency_us(200.0),
}
}
pub fn compute(series: &TimeSeries, period: Option<usize>, horizon: usize) -> ProphetResult {
if series.is_empty() {
return ProphetResult {
trend: Vec::new(),
seasonal: None,
holidays: None,
residuals: Vec::new(),
forecast: Vec::new(),
};
}
let n = series.len();
let window = period.unwrap_or(1);
let trend = Self::extract_trend(&series.values, window);
let seasonal = if let Some(p) = period {
if p > 1 && n > p {
Some(Self::extract_seasonal(&series.values, &trend, p))
} else {
None
}
} else {
None
};
let residuals: Vec<f64> = series
.values
.iter()
.enumerate()
.map(|(i, &y)| {
let t = trend[i];
let s = seasonal.as_ref().map(|s| s[i % s.len()]).unwrap_or(0.0);
y - t - s
})
.collect();
let forecast = Self::forecast(&trend, seasonal.as_ref(), &residuals, horizon);
ProphetResult {
trend,
seasonal,
holidays: None,
residuals,
forecast,
}
}
#[allow(clippy::needless_range_loop)]
fn extract_trend(values: &[f64], window: usize) -> Vec<f64> {
let n = values.len();
let w = window.max(1);
let half_w = w / 2;
let mut trend = vec![0.0; n];
for i in 0..n {
let start = i.saturating_sub(half_w);
let end = (i + half_w + 1).min(n);
let count = end - start;
trend[i] = values[start..end].iter().sum::<f64>() / count as f64;
}
trend
}
fn extract_seasonal(values: &[f64], trend: &[f64], period: usize) -> Vec<f64> {
let _n = values.len();
let detrended: Vec<f64> = values
.iter()
.zip(trend.iter())
.map(|(v, t)| v - t)
.collect();
let mut seasonal = vec![0.0; period];
let mut counts = vec![0usize; period];
for (i, &d) in detrended.iter().enumerate() {
let s = i % period;
seasonal[s] += d;
counts[s] += 1;
}
for (s, &c) in counts.iter().enumerate() {
if c > 0 {
seasonal[s] /= c as f64;
}
}
let mean: f64 = seasonal.iter().sum::<f64>() / period as f64;
for s in &mut seasonal {
*s -= mean;
}
seasonal
}
fn forecast(
trend: &[f64],
seasonal: Option<&Vec<f64>>,
_residuals: &[f64],
horizon: usize,
) -> Vec<f64> {
let n = trend.len();
if n < 2 {
return vec![trend.last().copied().unwrap_or(0.0); horizon];
}
let slope = trend[n - 1] - trend[n - 2];
let last_trend = trend[n - 1];
let mut forecasts = Vec::with_capacity(horizon);
for h in 1..=horizon {
let trend_forecast = last_trend + slope * h as f64;
let seasonal_forecast = seasonal.map(|s| s[(n + h - 1) % s.len()]).unwrap_or(0.0);
forecasts.push(trend_forecast + seasonal_forecast);
}
forecasts
}
}
impl GpuKernel for ProphetDecomposition {
fn metadata(&self) -> &KernelMetadata {
&self.metadata
}
}
#[async_trait]
impl BatchKernel<ProphetDecompositionInput, ProphetDecompositionOutput> for ProphetDecomposition {
async fn execute(
&self,
input: ProphetDecompositionInput,
) -> Result<ProphetDecompositionOutput> {
let start = Instant::now();
let result = Self::compute(&input.series, input.period, input.horizon);
Ok(ProphetDecompositionOutput {
result,
compute_time_us: start.elapsed().as_micros() as u64,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_trend_series() -> TimeSeries {
TimeSeries::new((0..50).map(|t| 10.0 + 2.0 * t as f64).collect())
}
fn create_seasonal_series() -> TimeSeries {
let period = 12;
let values: Vec<f64> = (0..60)
.map(|t| {
let trend = 100.0 + 0.5 * t as f64;
let seasonal =
10.0 * ((2.0 * std::f64::consts::PI * t as f64 / period as f64).sin());
trend + seasonal
})
.collect();
TimeSeries::new(values)
}
#[test]
fn test_arima_metadata() {
let kernel = ARIMAForecast::new();
assert_eq!(kernel.metadata().id, "temporal/arima-forecast");
assert_eq!(kernel.metadata().domain, Domain::TemporalAnalysis);
}
#[test]
fn test_arima_forecast_trend() {
let series = create_trend_series();
let params = ARIMAParams::new(1, 1, 0); let result = ARIMAForecast::compute(&series, params, 5);
assert_eq!(result.forecast.len(), 5);
assert!(!result.ar_coefficients.is_empty() || params.p == 0);
let last = *series.values.last().unwrap();
for f in &result.forecast {
assert!(*f > last * 0.8); }
}
#[test]
fn test_prophet_metadata() {
let kernel = ProphetDecomposition::new();
assert_eq!(kernel.metadata().id, "temporal/prophet-decomposition");
}
#[test]
fn test_prophet_decomposition() {
let series = create_seasonal_series();
let result = ProphetDecomposition::compute(&series, Some(12), 12);
assert_eq!(result.trend.len(), series.len());
assert!(result.seasonal.is_some());
assert_eq!(result.seasonal.as_ref().unwrap().len(), 12);
assert_eq!(result.forecast.len(), 12);
}
#[test]
fn test_prophet_no_seasonality() {
let series = create_trend_series();
let result = ProphetDecomposition::compute(&series, None, 5);
assert!(result.seasonal.is_none());
assert_eq!(result.forecast.len(), 5);
}
#[test]
fn test_empty_series() {
let empty = TimeSeries::new(Vec::new());
let arima = ARIMAForecast::compute(&empty, ARIMAParams::new(1, 0, 0), 5);
assert!(arima.forecast.is_empty());
let prophet = ProphetDecomposition::compute(&empty, Some(12), 5);
assert!(prophet.forecast.is_empty());
}
}