1use crate::aggregator::Timeframe;
4use crate::error::{Error, Result};
5use wickra_core::Candle;
6
7#[derive(Debug, Clone)]
14pub struct Resampler {
15 timeframe: Timeframe,
16 open: Option<RolledBar>,
17}
18
19#[derive(Debug, Clone, Copy)]
20struct RolledBar {
21 bucket_start: i64,
22 open: f64,
23 high: f64,
24 low: f64,
25 close: f64,
26 volume: f64,
27}
28
29impl RolledBar {
30 fn from_candle(c: Candle, bucket_start: i64) -> Self {
31 Self {
32 bucket_start,
33 open: c.open,
34 high: c.high,
35 low: c.low,
36 close: c.close,
37 volume: c.volume,
38 }
39 }
40
41 fn absorb(&mut self, c: Candle) {
42 if c.high > self.high {
43 self.high = c.high;
44 }
45 if c.low < self.low {
46 self.low = c.low;
47 }
48 self.close = c.close;
49 self.volume += c.volume;
50 }
51
52 fn into_candle(self) -> Result<Candle> {
61 Candle::new(
62 self.open,
63 self.high,
64 self.low,
65 self.close,
66 self.volume,
67 self.bucket_start,
68 )
69 .map_err(Error::from)
70 }
71}
72
73impl Resampler {
74 pub fn new(timeframe: Timeframe) -> Self {
76 Self {
77 timeframe,
78 open: None,
79 }
80 }
81
82 pub fn push(&mut self, candle: Candle) -> Result<Option<Candle>> {
90 let bucket = self.timeframe.floor(candle.timestamp);
91 match self.open {
92 Some(mut bar) if bucket == bar.bucket_start => {
93 bar.absorb(candle);
94 self.open = Some(bar);
95 Ok(None)
96 }
97 Some(bar) if bucket > bar.bucket_start => {
98 let closed = bar.into_candle()?;
99 self.open = Some(RolledBar::from_candle(candle, bucket));
100 Ok(Some(closed))
101 }
102 Some(bar) => Err(Error::Malformed(format!(
103 "candle timestamp {} is older than the open bar start {}",
104 candle.timestamp, bar.bucket_start
105 ))),
106 None => {
107 self.open = Some(RolledBar::from_candle(candle, bucket));
108 Ok(None)
109 }
110 }
111 }
112
113 pub fn flush(&mut self) -> Result<Option<Candle>> {
119 self.open.take().map(RolledBar::into_candle).transpose()
120 }
121}
122
123pub fn resample_all<I>(timeframe: Timeframe, iter: I) -> Result<Vec<Candle>>
126where
127 I: IntoIterator<Item = Result<Candle>>,
128{
129 let mut r = Resampler::new(timeframe);
130 let mut out = Vec::new();
131 for c in iter {
132 let c = c?;
133 if let Some(closed) = r.push(c)? {
134 out.push(closed);
135 }
136 }
137 if let Some(last) = r.flush()? {
138 out.push(last);
139 }
140 Ok(out)
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 fn c(ts: i64, o: f64, h: f64, l: f64, cl: f64, v: f64) -> Candle {
148 Candle::new(o, h, l, cl, v, ts).unwrap()
149 }
150
151 #[test]
152 fn resamples_1m_to_5m() {
153 let tf = Timeframe::new(5).unwrap();
154 let one_m = vec![
155 c(0, 10.0, 11.0, 9.0, 10.5, 10.0),
156 c(1, 10.5, 12.0, 10.0, 11.5, 12.0),
157 c(2, 11.5, 13.0, 11.0, 12.5, 15.0),
158 c(3, 12.5, 12.8, 11.5, 12.0, 8.0),
159 c(4, 12.0, 12.2, 11.0, 11.5, 6.0),
160 c(5, 11.5, 11.9, 11.0, 11.5, 4.0),
161 ];
162 let rolled = resample_all(tf, one_m.into_iter().map(Ok)).unwrap();
163 assert_eq!(rolled.len(), 2);
165 let a = rolled[0];
166 assert_eq!(a.open, 10.0);
167 assert_eq!(a.close, 11.5);
168 assert_eq!(a.high, 13.0);
169 assert_eq!(a.low, 9.0);
170 assert!((a.volume - 51.0).abs() < 1e-12);
171 let b = rolled[1];
172 assert_eq!(b.open, 11.5);
173 assert_eq!(b.timestamp, 5);
174 }
175
176 #[test]
177 fn rejects_out_of_order_candle() {
178 let mut r = Resampler::new(Timeframe::new(5).unwrap());
179 assert!(r.push(c(10, 10.0, 11.0, 9.0, 10.5, 1.0)).unwrap().is_none());
180 let err = r.push(c(2, 10.0, 11.0, 9.0, 10.5, 1.0)).unwrap_err();
182 assert!(matches!(err, Error::Malformed(_)));
183 }
184
185 #[test]
186 fn same_bucket_candles_aggregate() {
187 let mut r = Resampler::new(Timeframe::new(5).unwrap());
188 assert!(r.push(c(0, 10.0, 11.0, 9.0, 10.5, 1.0)).unwrap().is_none());
189 assert!(r.push(c(3, 10.5, 12.0, 10.0, 11.0, 1.0)).unwrap().is_none());
190 let bar = r.flush().unwrap().unwrap();
191 assert_eq!(bar.high, 12.0);
192 assert_eq!(bar.low, 9.0);
193 }
194
195 #[test]
196 fn absorb_lowers_low_on_dipping_candle() {
197 let mut r = Resampler::new(Timeframe::new(5).unwrap());
202 r.push(c(0, 10.0, 11.0, 10.0, 10.5, 1.0)).unwrap();
203 r.push(c(1, 10.5, 11.5, 8.0, 9.0, 1.0)).unwrap();
204 let bar = r.flush().unwrap().unwrap();
205 assert_eq!(bar.low, 8.0);
206 assert_eq!(bar.high, 11.5);
207 }
208
209 #[test]
210 fn flushes_a_non_finite_volume_as_an_error() {
211 let mut r = Resampler::new(Timeframe::new(5).unwrap());
212 assert!(r
214 .push(c(0, 10.0, 11.0, 9.0, 10.5, f64::MAX))
215 .unwrap()
216 .is_none());
217 assert!(r
218 .push(c(1, 10.0, 11.0, 9.0, 10.5, f64::MAX))
219 .unwrap()
220 .is_none());
221 let err = r.flush().unwrap_err();
222 assert!(matches!(err, Error::Core(_)));
223 }
224}