Skip to main content

wickra_data/
resample.rs

1//! Resample an existing candle stream from a finer timeframe to a coarser one.
2
3use crate::aggregator::Timeframe;
4use crate::error::{Error, Result};
5use wickra_core::Candle;
6
7/// Roll a stream of candles up to a coarser timeframe.
8///
9/// Used to derive 5m bars from a 1m feed, or 1h bars from 5m bars, without
10/// touching the original tick stream. The output timeframe's bucket must be a
11/// strict multiple of the input timeframe's bucket, but this is not enforced
12/// — callers are responsible for picking sensible aggregations.
13#[derive(Debug, Clone)]
14pub struct Resampler {
15    timeframe: Timeframe,
16    open: Option<RolledBar>,
17}
18
19#[derive(Debug, Clone, Copy)]
20struct RolledBar {
21    bucket_start: i64,
22    open: f64,
23    high: f64,
24    low: f64,
25    close: f64,
26    volume: f64,
27}
28
29impl RolledBar {
30    fn from_candle(c: Candle, bucket_start: i64) -> Self {
31        Self {
32            bucket_start,
33            open: c.open,
34            high: c.high,
35            low: c.low,
36            close: c.close,
37            volume: c.volume,
38        }
39    }
40
41    fn absorb(&mut self, c: Candle) {
42        if c.high > self.high {
43            self.high = c.high;
44        }
45        if c.low < self.low {
46            self.low = c.low;
47        }
48        self.close = c.close;
49        self.volume += c.volume;
50    }
51
52    /// Finalise the rolled bar into a validated [`Candle`].
53    ///
54    /// # Errors
55    /// Returns [`Error::Core`] if the accumulated `volume` is no longer finite.
56    /// `volume` is summed across every absorbed candle, so a long or large run
57    /// can drift it to `inf`; emitting such a candle would silently poison
58    /// every downstream indicator, so it is surfaced instead. The OHLC fields
59    /// are finite and correctly ordered by construction.
60    fn into_candle(self) -> Result<Candle> {
61        Candle::new(
62            self.open,
63            self.high,
64            self.low,
65            self.close,
66            self.volume,
67            self.bucket_start,
68        )
69        .map_err(Error::from)
70    }
71}
72
73impl Resampler {
74    /// Build a resampler targeting the given output timeframe.
75    pub fn new(timeframe: Timeframe) -> Self {
76        Self {
77            timeframe,
78            open: None,
79        }
80    }
81
82    /// Push a finer-grained candle. Returns the coarser candle that just closed,
83    /// if any.
84    ///
85    /// # Errors
86    /// Returns [`Error::Malformed`] if `candle.timestamp` falls into a bucket
87    /// strictly before the currently open bar — out-of-order candles are not
88    /// supported, matching [`crate::aggregator::TickAggregator::push`].
89    pub fn push(&mut self, candle: Candle) -> Result<Option<Candle>> {
90        let bucket = self.timeframe.floor(candle.timestamp);
91        match self.open {
92            Some(mut bar) if bucket == bar.bucket_start => {
93                bar.absorb(candle);
94                self.open = Some(bar);
95                Ok(None)
96            }
97            Some(bar) if bucket > bar.bucket_start => {
98                let closed = bar.into_candle()?;
99                self.open = Some(RolledBar::from_candle(candle, bucket));
100                Ok(Some(closed))
101            }
102            Some(bar) => Err(Error::Malformed(format!(
103                "candle timestamp {} is older than the open bar start {}",
104                candle.timestamp, bar.bucket_start
105            ))),
106            None => {
107                self.open = Some(RolledBar::from_candle(candle, bucket));
108                Ok(None)
109            }
110        }
111    }
112
113    /// Flush the currently open coarser bar, if any.
114    ///
115    /// # Errors
116    /// Returns an error if the open bar's accumulated volume is non-finite
117    /// (see [`RolledBar::into_candle`]).
118    pub fn flush(&mut self) -> Result<Option<Candle>> {
119        self.open.take().map(RolledBar::into_candle).transpose()
120    }
121}
122
123/// Roll an entire iterator of candles into a `Vec` of coarser candles. The final
124/// open bar (if any) is appended via [`Resampler::flush`].
125pub fn resample_all<I>(timeframe: Timeframe, iter: I) -> Result<Vec<Candle>>
126where
127    I: IntoIterator<Item = Result<Candle>>,
128{
129    let mut r = Resampler::new(timeframe);
130    let mut out = Vec::new();
131    for c in iter {
132        let c = c?;
133        if let Some(closed) = r.push(c)? {
134            out.push(closed);
135        }
136    }
137    if let Some(last) = r.flush()? {
138        out.push(last);
139    }
140    Ok(out)
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    fn c(ts: i64, o: f64, h: f64, l: f64, cl: f64, v: f64) -> Candle {
148        Candle::new(o, h, l, cl, v, ts).unwrap()
149    }
150
151    #[test]
152    fn resamples_1m_to_5m() {
153        let tf = Timeframe::new(5).unwrap();
154        let one_m = vec![
155            c(0, 10.0, 11.0, 9.0, 10.5, 10.0),
156            c(1, 10.5, 12.0, 10.0, 11.5, 12.0),
157            c(2, 11.5, 13.0, 11.0, 12.5, 15.0),
158            c(3, 12.5, 12.8, 11.5, 12.0, 8.0),
159            c(4, 12.0, 12.2, 11.0, 11.5, 6.0),
160            c(5, 11.5, 11.9, 11.0, 11.5, 4.0),
161        ];
162        let rolled = resample_all(tf, one_m.into_iter().map(Ok)).unwrap();
163        // First 5 candles share bucket 0 -> aggregate. Last candle opens bucket 5.
164        assert_eq!(rolled.len(), 2);
165        let a = rolled[0];
166        assert_eq!(a.open, 10.0);
167        assert_eq!(a.close, 11.5);
168        assert_eq!(a.high, 13.0);
169        assert_eq!(a.low, 9.0);
170        assert!((a.volume - 51.0).abs() < 1e-12);
171        let b = rolled[1];
172        assert_eq!(b.open, 11.5);
173        assert_eq!(b.timestamp, 5);
174    }
175
176    #[test]
177    fn rejects_out_of_order_candle() {
178        let mut r = Resampler::new(Timeframe::new(5).unwrap());
179        assert!(r.push(c(10, 10.0, 11.0, 9.0, 10.5, 1.0)).unwrap().is_none());
180        // A candle in an earlier bucket than the open bar is rejected.
181        let err = r.push(c(2, 10.0, 11.0, 9.0, 10.5, 1.0)).unwrap_err();
182        assert!(matches!(err, Error::Malformed(_)));
183    }
184
185    #[test]
186    fn same_bucket_candles_aggregate() {
187        let mut r = Resampler::new(Timeframe::new(5).unwrap());
188        assert!(r.push(c(0, 10.0, 11.0, 9.0, 10.5, 1.0)).unwrap().is_none());
189        assert!(r.push(c(3, 10.5, 12.0, 10.0, 11.0, 1.0)).unwrap().is_none());
190        let bar = r.flush().unwrap().unwrap();
191        assert_eq!(bar.high, 12.0);
192        assert_eq!(bar.low, 9.0);
193    }
194
195    #[test]
196    fn absorb_lowers_low_on_dipping_candle() {
197        // The first candle in the bucket sets low = 10.0; the second dips to
198        // 8.0 and must overwrite. Exercises the `c.low < self.low` branch in
199        // RolledBar::absorb that the other resampler tests never trigger
200        // because their follow-up candles always have a higher low.
201        let mut r = Resampler::new(Timeframe::new(5).unwrap());
202        r.push(c(0, 10.0, 11.0, 10.0, 10.5, 1.0)).unwrap();
203        r.push(c(1, 10.5, 11.5, 8.0, 9.0, 1.0)).unwrap();
204        let bar = r.flush().unwrap().unwrap();
205        assert_eq!(bar.low, 8.0);
206        assert_eq!(bar.high, 11.5);
207    }
208
209    #[test]
210    fn flushes_a_non_finite_volume_as_an_error() {
211        let mut r = Resampler::new(Timeframe::new(5).unwrap());
212        // Two near-max volumes in the same bucket sum to +inf.
213        assert!(r
214            .push(c(0, 10.0, 11.0, 9.0, 10.5, f64::MAX))
215            .unwrap()
216            .is_none());
217        assert!(r
218            .push(c(1, 10.0, 11.0, 9.0, 10.5, f64::MAX))
219            .unwrap()
220            .is_none());
221        let err = r.flush().unwrap_err();
222        assert!(matches!(err, Error::Core(_)));
223    }
224}