1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use polars::{
prelude::{EWMOptions, LazyFrame, RollingOptionsFixedWindow, SortMultipleOptions, col, lit},
series::ops::NullBehavior,
};
use serde::{Deserialize, Serialize};
use crate::{error::ChapatyResult, transport::schema::CanonicalCol};
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct EmaWindow(pub u16);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct SmaWindow(pub u16);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct RsiWindow(pub u16);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TechnicalIndicator {
Ema(EmaWindow),
Sma(SmaWindow),
Rsi(RsiWindow),
}
impl EmaWindow {
pub fn pre_compute_ema(&self, lf: LazyFrame) -> ChapatyResult<LazyFrame> {
let window = self.0;
// Standard EMA formula: alpha = 2 / (span + 1)
let alpha = 2.0 / (window as f64 + 1.0);
let options = EWMOptions {
alpha,
// Use recursive calculation
adjust: false,
// Do not apply statistical sample correction; we want the raw weighted average.
bias: false,
// Don't emit values until we have seen 'window' trades.
// This avoids noisy, highly-volatile values at the start of the stream.
min_periods: window as usize,
// If a price is missing, skip the decay step for that row.
// This ensures the EMA doesn't artificially drop if we have gaps.
ignore_nulls: true,
};
Ok(lf
.sort(
[CanonicalCol::Timestamp],
SortMultipleOptions::default().with_maintain_order(false),
)
.select([
col(CanonicalCol::Timestamp).alias(CanonicalCol::Timestamp),
col(CanonicalCol::Close)
.ewm_mean(options)
.alias(CanonicalCol::Price),
])
.drop_nulls(None))
}
}
impl SmaWindow {
pub fn pre_compute_sma(&self, lf: LazyFrame) -> ChapatyResult<LazyFrame> {
let window = self.0;
let options = RollingOptionsFixedWindow {
window_size: window as usize,
min_periods: window as usize, // Strict: Require full window validity
weights: None, // Standard SMA is unweighted
center: false, // False prevents look-ahead bias
fn_params: None,
};
Ok(lf
.sort(
[CanonicalCol::Timestamp],
SortMultipleOptions::default().with_maintain_order(false),
)
.select([
col(CanonicalCol::Timestamp).alias(CanonicalCol::Timestamp),
col(CanonicalCol::Close)
.rolling_mean(options)
.alias(CanonicalCol::Price),
])
.drop_nulls(None))
}
}
impl RsiWindow {
pub fn pre_compute_rsi(&self, lf: LazyFrame) -> ChapatyResult<LazyFrame> {
let window = self.0;
// Wilder's Smoothing for RSI: alpha = 1 / N
let alpha = 1.0 / (window as f64);
// Wilder's Smoothing (effectively an EMA with alpha = 1/window)
// Note: Some RSI implementations use SMA, but Wilder's is standard.
let options = EWMOptions {
alpha,
// Use recursive calculation
adjust: false,
// Do not apply statistical sample correction; we want the raw weighted average.
bias: false,
// Don't emit values until we have seen 'window' trades.
// This avoids noisy, highly-volatile values at the start of the stream.
min_periods: window as usize,
// If a price is missing, skip the decay step for that row.
// This ensures the EMA doesn't artificially drop if we have gaps.
ignore_nulls: true,
};
let rsi_expr = {
// 1. Calculate the CHANGE (P_t - P_t-1)
let delta = col(CanonicalCol::Close).diff(lit(1), NullBehavior::Ignore);
// 2. Separate Gains (Up moves) and Losses (Down moves)
let gain = delta.clone().clip(lit(0), lit(f64::MAX));
let loss = delta.clip(lit(f64::MIN), lit(0)).abs();
// 3. Apply Wilder's Smoothing
let avg_gain = gain.ewm_mean(options);
let avg_loss = loss.ewm_mean(options);
// 4. Calculate Ratio and Normalize to 0-100
let rs = avg_gain / avg_loss;
lit(100.0) - (lit(100.0) / (lit(1.0) + rs))
};
Ok(lf
.sort(
[CanonicalCol::Timestamp],
SortMultipleOptions::default().with_maintain_order(false),
)
.select([
col(CanonicalCol::Timestamp).alias(CanonicalCol::Timestamp),
rsi_expr.alias(CanonicalCol::Price),
])
.drop_nulls(None))
}
}