Skip to main content

indicators/signal/
cvd.rs

1//! Layer 8 — Cumulative Volume Delta (OHLCV heuristic).
2//!
3//! Estimates buy/sell volume from OHLCV bars and tracks cumulative delta,
4//! slope, and price-CVD divergence.
5
6use std::collections::{HashMap, VecDeque};
7
8use chrono::{NaiveDate, TimeZone, Utc};
9
10use crate::error::IndicatorError;
11use crate::indicator::{Indicator, IndicatorOutput};
12use crate::registry::param_usize;
13use crate::types::Candle;
14
15// ── Params ────────────────────────────────────────────────────────────────────
16
17#[derive(Debug, Clone)]
18pub struct CvdParams {
19    pub slope_bars: usize,
20    pub div_lookback: usize,
21}
22
23impl Default for CvdParams {
24    fn default() -> Self {
25        Self {
26            slope_bars: 10,
27            div_lookback: 20,
28        }
29    }
30}
31
32// ── Indicator wrapper ─────────────────────────────────────────────────────────
33
34/// Batch `Indicator` adapter for [`CVDTracker`].
35#[derive(Debug, Clone)]
36pub struct CvdIndicator {
37    pub params: CvdParams,
38}
39
40impl CvdIndicator {
41    pub fn new(params: CvdParams) -> Self {
42        Self { params }
43    }
44}
45
46impl Indicator for CvdIndicator {
47    fn name(&self) -> &'static str {
48        "CVD"
49    }
50    fn required_len(&self) -> usize {
51        self.params.div_lookback + 1
52    }
53    fn required_columns(&self) -> &[&'static str] {
54        &["open", "high", "low", "close", "volume"]
55    }
56
57    fn calculate(&self, candles: &[Candle]) -> Result<IndicatorOutput, IndicatorError> {
58        self.check_len(candles)?;
59        let p = &self.params;
60        let mut tracker = CVDTracker::new(p.slope_bars, p.div_lookback);
61        let n = candles.len();
62        let mut cvd_out = vec![f64::NAN; n];
63        let mut slope = vec![f64::NAN; n];
64        let mut div_out = vec![f64::NAN; n];
65        for (i, c) in candles.iter().enumerate() {
66            tracker.update(c);
67            cvd_out[i] = tracker.cvd;
68            slope[i] = tracker.cvd_slope;
69            div_out[i] = tracker.divergence as f64;
70        }
71        Ok(IndicatorOutput::from_pairs([
72            ("cvd", cvd_out),
73            ("cvd_slope", slope),
74            ("cvd_div", div_out),
75        ]))
76    }
77}
78
79// ── Registry factory ──────────────────────────────────────────────────────────
80
81pub fn factory<S: ::std::hash::BuildHasher>(
82    params: &HashMap<String, String, S>,
83) -> Result<Box<dyn Indicator>, IndicatorError> {
84    let slope_bars = param_usize(params, "slope_bars", 10)?;
85    let div_lookback = param_usize(params, "div_lookback", 20)?;
86    Ok(Box::new(CvdIndicator::new(CvdParams {
87        slope_bars,
88        div_lookback,
89    })))
90}
91
92#[derive(Debug)]
93pub struct CVDTracker {
94    slope_bars: usize,
95    div_lookback: usize,
96
97    day_cvd: f64,
98    last_date: Option<NaiveDate>,
99    cvd_hist: VecDeque<f64>,
100    price_hist: VecDeque<f64>,
101
102    pub cvd: f64,
103    pub delta: f64,
104    pub cvd_slope: f64,
105    pub bullish: bool,
106    /// `+1` = bullish divergence, `-1` = bearish divergence, `0` = none.
107    pub divergence: i8,
108}
109
110impl CVDTracker {
111    pub fn new(slope_bars: usize, div_lookback: usize) -> Self {
112        let cap = (div_lookback + 10).max(50);
113        Self {
114            slope_bars,
115            div_lookback,
116            day_cvd: 0.0,
117            last_date: None,
118            cvd_hist: VecDeque::with_capacity(cap),
119            price_hist: VecDeque::with_capacity(cap),
120            cvd: 0.0,
121            delta: 0.0,
122            cvd_slope: 0.0,
123            bullish: false,
124            divergence: 0,
125        }
126    }
127
128    pub fn update(&mut self, candle: &Candle) {
129        let dt = Utc
130            .timestamp_millis_opt(candle.time)
131            .single()
132            .unwrap_or_else(Utc::now);
133        let date = dt.date_naive();
134
135        if Some(date) != self.last_date {
136            self.day_cvd = 0.0;
137            self.last_date = Some(date);
138        }
139
140        let bar_rng = candle.high - candle.low;
141        let buy_vol = if bar_rng > 0.0 {
142            candle.volume * (candle.close - candle.low) / bar_rng
143        } else {
144            candle.volume * 0.5
145        };
146        self.delta = buy_vol - (candle.volume - buy_vol);
147        self.day_cvd += self.delta;
148        self.cvd = self.day_cvd;
149
150        let cap = self.cvd_hist.capacity();
151        if self.cvd_hist.len() == cap {
152            self.cvd_hist.pop_front();
153        }
154        if self.price_hist.len() == cap {
155            self.price_hist.pop_front();
156        }
157        self.cvd_hist.push_back(self.cvd);
158        self.price_hist.push_back(candle.close);
159
160        if self.cvd_hist.len() >= self.slope_bars {
161            let arr: Vec<f64> = self.cvd_hist.iter().copied().collect();
162            self.cvd_slope = arr[arr.len() - 1] - arr[arr.len() - self.slope_bars];
163        }
164        self.bullish = self.cvd_slope > 0.0;
165        self.divergence = self.check_divergence();
166    }
167
168    fn check_divergence(&self) -> i8 {
169        let n = self.cvd_hist.len().min(self.div_lookback);
170        if n < 10 {
171            return 0;
172        }
173        let prices: Vec<f64> = self.price_hist.iter().rev().take(n).copied().collect();
174        let cvds: Vec<f64> = self.cvd_hist.iter().rev().take(n).copied().collect();
175
176        let last_p = prices[0];
177        let last_c = cvds[0];
178
179        // Bullish divergence: price at new low but CVD is not
180        let min_p = prices[1..].iter().copied().fold(f64::INFINITY, f64::min);
181        let min_c = cvds[1..].iter().copied().fold(f64::INFINITY, f64::min);
182        if last_p < min_p && last_c > min_c {
183            return 1;
184        }
185
186        // Bearish divergence: price at new high but CVD is not
187        let max_p = prices[1..]
188            .iter()
189            .copied()
190            .fold(f64::NEG_INFINITY, f64::max);
191        let max_c = cvds[1..].iter().copied().fold(f64::NEG_INFINITY, f64::max);
192        if last_p > max_p && last_c < max_c {
193            return -1;
194        }
195
196        0
197    }
198}