trade_aggregation/aggregation_rules/
aligned_time_rule.rs

1use crate::{AggregationRule, MillisecondPeriod, ModularCandle, TakerTrade, TimestampResolution};
2
3/// The classic time based aggregation rule,
4/// creating a new candle every n seconds.  The time trigger is aligned such that
5/// the trigger points are starting from a time equals zero.  For example, if the first
6/// tick comes in a 1:32:00 on a 5 minute candle, that first candle will only contain
7/// 3 minutes of trades, representing a 1:30 start.
8#[derive(Debug, Clone)]
9pub struct AlignedTimeRule {
10    // The timestamp this rule uses as a reference
11    reference_timestamp: i64,
12
13    // The period for the candle in seconds
14    // constants can be used nicely here from constants.rs
15    // e.g.: M1 -> 1 minute candles
16    period_in_units_from_trade: i64,
17}
18
19impl AlignedTimeRule {
20    /// Create a new instance of the aligned time rule,
21    /// with a given candle period in seconds
22    ///
23    /// # Arguments:
24    /// period_s: How many seconds a candle will contain
25    /// ts_res: The resolution each Trade timestamp will have
26    ///
27    pub fn new(
28        period_ms: MillisecondPeriod,
29        trade_timestamp_resolution: TimestampResolution,
30    ) -> Self {
31        use TimestampResolution::*;
32        let ts_multiplier = match trade_timestamp_resolution {
33            Millisecond => 1,
34            Microsecond => 1_000,
35            Nanosecond => 1_000_000,
36        };
37
38        Self {
39            reference_timestamp: 0,
40            period_in_units_from_trade: period_ms.get() as i64 * ts_multiplier,
41        }
42    }
43
44    /// Calculates the "aligned" timestamp, which the rule will use when receiving
45    /// for determining the trigger.  This is done at the initialization of
46    /// each period.
47    #[must_use]
48    pub fn aligned_timestamp(&self, timestamp: i64) -> i64 {
49        timestamp - (timestamp % self.period_in_units_from_trade)
50    }
51}
52
53impl<C, T> AggregationRule<C, T> for AlignedTimeRule
54where
55    C: ModularCandle<T>,
56    T: TakerTrade,
57{
58    fn should_trigger(&mut self, trade: &T, _candle: &C) -> bool {
59        if self.reference_timestamp == 0 {
60            self.reference_timestamp = self.aligned_timestamp(trade.timestamp());
61            return false;
62        }
63
64        let should_trigger =
65            trade.timestamp() - self.reference_timestamp >= self.period_in_units_from_trade;
66        if should_trigger {
67            // Advance the trigger timestamp to the next period.
68            // If the period is too small, then the trade timestamps will out-pace and always trigger.
69            // Alternatively doing `self.reference_timestamp = self.aligned_timestamp(trade.timestamp())` will cause drift
70            // and not produce enough samples.
71            self.reference_timestamp = self.reference_timestamp + self.period_in_units_from_trade
72        }
73
74        should_trigger
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use trade_aggregation_derive::Candle;
81
82    use super::*;
83    use crate::{
84        aggregate_all_trades,
85        candle_components::{
86            CandleComponent, CandleComponentUpdate, Close, NumTrades, Open, Volume,
87        },
88        load_trades_from_csv,
89        plot::OhlcCandle,
90        GenericAggregator, ModularCandle, TimestampResolution, Trade, M1, M15,
91    };
92
93    #[derive(Default, Debug, Clone, Candle)]
94    struct MyCandle {
95        open: Open,
96        close: Close,
97        num_trades: NumTrades<u32>,
98        volume: Volume,
99    }
100
101    #[test]
102    fn aligned_time_rule() {
103        let trades = load_trades_from_csv("data/Bitmex_XBTUSD_1M.csv").unwrap();
104
105        let mut aggregator = GenericAggregator::<MyCandle, AlignedTimeRule, Trade>::new(
106            AlignedTimeRule::new(M15, TimestampResolution::Millisecond),
107            false,
108        );
109        let candles = aggregate_all_trades(&trades, &mut aggregator);
110        assert_eq!(candles.len(), 396);
111
112        // make sure that the aggregator starts a new candle with the "trigger tick",
113        // and includes that information of the trade that triggered the new candle as well
114        let c = &candles[0];
115        assert_eq!(c.open(), 13873.0);
116        assert_eq!(c.close(), 13769.0);
117        let c = &candles[1];
118        assert_eq!(c.open(), 13768.5);
119        assert_eq!(c.close(), 13721.5);
120    }
121
122    #[test]
123    fn aligned_time_rule_volume() {
124        let trades = load_trades_from_csv("data/Bitstamp_BTCEUR_1M.csv").unwrap();
125
126        let mut aggregator = GenericAggregator::<MyCandle, AlignedTimeRule, Trade>::new(
127            AlignedTimeRule::new(M1, TimestampResolution::Microsecond),
128            false,
129        );
130        let candles = aggregate_all_trades(&trades, &mut aggregator);
131
132        let c = &candles[0];
133        assert_eq!(c.num_trades(), 10);
134        assert_eq!(c.volume(), 0.27458132);
135    }
136
137    #[test]
138    fn aligned_time_rule_trigger_on_0() {
139        let trades: [Trade; 5] = [
140            Trade {
141                timestamp: 1712656800000,
142                price: 100.0,
143                size: 10.0,
144            },
145            Trade {
146                timestamp: 1712656815000,
147                price: 101.0,
148                size: -10.0,
149            },
150            Trade {
151                timestamp: 1712656860000,
152                price: 100.5,
153                size: -10.0,
154            },
155            Trade {
156                timestamp: 1712656860001,
157                price: 102.0,
158                size: -10.0,
159            },
160            Trade {
161                timestamp: 1712656935000,
162                price: 105.0,
163                size: -10.0,
164            },
165        ];
166
167        let mut aggregator = GenericAggregator::<OhlcCandle, AlignedTimeRule, Trade>::new(
168            AlignedTimeRule::new(M1, TimestampResolution::Millisecond),
169            false,
170        );
171        let candles = aggregate_all_trades(&trades, &mut aggregator);
172        assert_eq!(candles.len(), 2);
173        assert_eq!(candles[0].open(), 100.00);
174        assert_eq!(candles[0].close(), 101.00);
175        assert_eq!(candles[1].open(), 100.5);
176        assert_eq!(candles[1].close(), 102.00);
177    }
178
179    #[test]
180    fn aligned_time_rule_candle_with_one_trade() {
181        let trades: [Trade; 4] = [
182            Trade {
183                timestamp: 1712656800000,
184                price: 100.0,
185                size: 10.0,
186            },
187            Trade {
188                timestamp: 1712656815000,
189                price: 101.0,
190                size: -10.0,
191            },
192            Trade {
193                timestamp: 1712656861000,
194                price: 100.5,
195                size: -10.0,
196            },
197            Trade {
198                timestamp: 1712657930000,
199                price: 102.0,
200                size: -10.0,
201            },
202        ];
203
204        let mut aggregator = GenericAggregator::<OhlcCandle, AlignedTimeRule, Trade>::new(
205            AlignedTimeRule::new(M1, TimestampResolution::Millisecond),
206            false,
207        );
208        let candles = aggregate_all_trades(&trades, &mut aggregator);
209        assert_eq!(candles.len(), 2);
210        assert_eq!(candles[0].open(), 100.0);
211        assert_eq!(candles[0].close(), 101.0);
212        assert_eq!(candles[1].open(), 100.5);
213        assert_eq!(candles[1].close(), 100.5);
214    }
215}