1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#![feature(try_trait)]
extern crate cxmr_feeds;
#[cfg(feature = "analysis")]
extern crate cxmr_ta;
mod candle;
mod csv;
mod util;
pub use self::candle::*;
pub use self::csv::*;
pub use self::util::*;
use cxmr_feeds::EventData;
#[derive(Debug, Clone, PartialEq)]
pub enum UpdateKind {
Low,
High,
Close,
}
impl UpdateKind {
pub fn is_close(&self) -> bool {
match self {
UpdateKind::Close => true,
_ => false,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct UpdateState(pub Candle, pub UpdateKind);
impl UpdateState {
pub fn is_close(&self) -> bool {
self.1.is_close()
}
pub fn into_close(self) -> Option<Candle> {
match self.1 {
UpdateKind::Close => Some(self.0),
_ => None,
}
}
}
#[derive(Default)]
pub struct CandleCollector {
pub interval: u64,
pub candle: Candle,
pub close_only: bool,
}
impl CandleCollector {
pub fn new(interval: u64) -> CandleCollector {
CandleCollector {
interval,
..CandleCollector::default()
}
}
pub fn update(&mut self, event: &EventData) -> Option<UpdateState> {
if self.candle.time == 0 {
self.candle.time = round_open(event.ts, self.interval);
}
let candle = self.try_pop(event.ts);
let update = if event.is_trade {
self.candle.update(event.rate, event.amount as f64)
} else {
None
};
if let Some(candle) = candle {
Some(UpdateState(candle, UpdateKind::Close))
} else if !self.close_only || update == Some(UpdateKind::Close) {
let update = update?;
Some(UpdateState(self.candle.clone(), update))
} else {
None
}
}
pub fn consume(mut self, events: &Vec<EventData>) -> Vec<Candle> {
let mut candles: Vec<_> = events
.iter()
.filter_map(|ev| self.update(ev).and_then(|update| update.into_close()))
.collect();
if self.candle.is_filled() {
candles.push(self.candle);
}
candles
}
pub fn consume_updates(mut self, events: &Vec<EventData>) -> Vec<UpdateState> {
let mut candles: Vec<_> = events.iter().filter_map(|ev| self.update(ev)).collect();
if self.candle.is_filled() {
candles.push(UpdateState(self.candle, UpdateKind::Close));
}
candles
}
pub fn try_pop(&mut self, ts: u64) -> Option<Candle> {
if self.candle.time != 0 && ts > self.candle.time + self.interval {
let mut prev_candle = Candle::new(self.candle.close, self.candle.time + self.interval);
std::mem::swap(&mut self.candle, &mut prev_candle);
if prev_candle.open != 0.0 {
Some(prev_candle)
} else {
None
}
} else {
None
}
}
pub fn finish(self) -> Option<Candle> {
if self.candle.is_filled() {
Some(self.candle)
} else {
None
}
}
}