1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//! Candle data structures.
#![feature(try_trait)]

extern crate cxmr_feeds;
#[cfg(feature = "analysis")]
extern crate cxmr_ta;

mod candle;
mod csv;
mod util;

pub use self::candle::*;
pub use self::csv::*;
pub use self::util::*;

use cxmr_feeds::EventData;

/// Candle collector update klind structure.
#[derive(Debug, Clone, PartialEq)]
pub enum UpdateKind {
    /// Candle low.
    Low,
    /// Candle high.
    High,
    /// Candle close.
    Close,
}

impl UpdateKind {
    /// Returns true if kind is close.
    pub fn is_close(&self) -> bool {
        match self {
            UpdateKind::Close => true,
            _ => false,
        }
    }
}

/// Candle collector update state structure.
#[derive(Debug, Clone, PartialEq)]
pub struct UpdateState(pub Candle, pub UpdateKind);

impl UpdateState {
    /// Returns true if state is close.
    pub fn is_close(&self) -> bool {
        self.1.is_close()
    }

    /// Returns inner candle if state is close.
    pub fn into_close(self) -> Option<Candle> {
        match self.1 {
            UpdateKind::Close => Some(self.0),
            _ => None,
        }
    }
}

/// Candle collector.
#[derive(Default)]
pub struct CandleCollector {
    pub interval: u64,
    pub candle: Candle,
    pub close_only: bool,
}

impl CandleCollector {
    /// Creates new snapshot collector with interval.
    pub fn new(interval: u64) -> CandleCollector {
        CandleCollector {
            interval,
            ..CandleCollector::default()
        }
    }

    /// Collects data into a candle and returns if one is filled.
    pub fn update(&mut self, event: &EventData) -> Option<UpdateState> {
        if self.candle.time == 0 {
            self.candle.time = round_open(event.ts, self.interval);
        }
        let candle = self.try_pop(event.ts);
        let update = if event.is_trade {
            self.candle.update(event.rate, event.amount as f64)
        } else {
            None
        };
        if let Some(candle) = candle {
            Some(UpdateState(candle, UpdateKind::Close))
        } else if !self.close_only || update == Some(UpdateKind::Close) {
            let update = update?;
            Some(UpdateState(self.candle.clone(), update))
        } else {
            None
        }
    }

    /// Creates a snapshot from data rows.
    pub fn consume(mut self, events: &Vec<EventData>) -> Vec<Candle> {
        let mut candles: Vec<_> = events
            .iter()
            .filter_map(|ev| self.update(ev).and_then(|update| update.into_close()))
            .collect();
        if self.candle.is_filled() {
            candles.push(self.candle);
        }
        candles
    }

    /// Creates candles with updates from data rows.
    pub fn consume_updates(mut self, events: &Vec<EventData>) -> Vec<UpdateState> {
        let mut candles: Vec<_> = events.iter().filter_map(|ev| self.update(ev)).collect();
        if self.candle.is_filled() {
            candles.push(UpdateState(self.candle, UpdateKind::Close));
        }
        candles
    }

    /// Pops a candle from collector if timestamp can close it.
    pub fn try_pop(&mut self, ts: u64) -> Option<Candle> {
        if self.candle.time != 0 && ts > self.candle.time + self.interval {
            let mut prev_candle = Candle::new(self.candle.close, self.candle.time + self.interval);
            std::mem::swap(&mut self.candle, &mut prev_candle);
            if prev_candle.open != 0.0 {
                Some(prev_candle)
            } else {
                None
            }
        } else {
            None
        }
    }

    /// Returns last unclosed candle if it's at least partially filled.
    pub fn finish(self) -> Option<Candle> {
        if self.candle.is_filled() {
            Some(self.candle)
        } else {
            None
        }
    }
}