use std::collections::HashSet;
use super::{Candle, Time, Scale, Sampler};
use crate::dtf::update::Update;
use crate::utils::fill_digits;
use indexmap::IndexMap;
pub struct TimeSampler {
s: u64,
last: Option<u64>,
}
impl TimeSampler {
pub fn new(s: u64) -> Self {
Self {
s,
last: None,
}
}
}
impl Sampler for TimeSampler {
fn reset(&mut self) {
self.last = None;
}
fn is_sample(&mut self, trade: &Update) -> bool {
let ts = (fill_digits(trade.ts) / 1000 / self.s * self.s) as Time;
if self.last.is_none() {
self.last = Some(ts);
false
} else {
let last = self.last.unwrap();
if last != ts {
self.last = Some(ts);
true
} else {
false
}
}
}
}
pub struct TimeBarsIter<I:Iterator<Item=Update>> {
it: I,
current_candle: Option<Candle>,
sampler: TimeSampler,
seconds: u64,
}
impl<I:Iterator<Item=Update>> TimeBarsIter<I> {
pub fn new(it: I, seconds: u64) -> Self {
Self {
it,
current_candle: None,
sampler: TimeSampler::new(seconds),
seconds,
}
}
}
fn new_candle(t: Time, trade: Update) -> Candle {
Candle {
start: t,
end: t,
volume: trade.size,
high: trade.price,
low: trade.price,
close: trade.price,
open: trade.price,
}
}
impl<I:Iterator<Item=Update>> Iterator for TimeBarsIter<I> {
type Item = Candle;
fn next(&mut self) -> Option<Self::Item> {
while let Some(trade) = self.it.next() {
if !trade.is_trade {
continue;
}
let ts = (fill_digits(trade.ts) / 1000 / self.seconds * self.seconds) as Time;
self.current_candle = if let Some(c) = &self.current_candle {
if self.sampler.is_sample(&trade) {
let c = *c;
self.current_candle = Some(new_candle(ts, trade));
return Some(c);
} else {
Some(Candle {
start: ts,
end: ts,
volume: c.volume + trade.size,
high: trade.price.max(c.high),
low: trade.price.min(c.low),
close: trade.price,
open: c.open,
})
}
} else {
Some(new_candle(ts, trade))
};
}
None
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct TimeBars {
v: IndexMap<Time, Candle>,
scale: Scale,
}
impl<'a> From<&'a [Update]> for TimeBars {
fn from(ups: &[Update]) -> TimeBars {
let candles = TimeBarsIter::new(ups.iter().copied(), 60).map(|c| (c.start, c)).collect();
return TimeBars::new(candles, 1);
}
}
impl TimeBars {
pub fn get_size(&self) -> usize {
self.v.len()
}
pub fn get_candles<'a>(&'a self) -> indexmap::map::Values<'a, Time, Candle> {
self.v.values()
}
pub fn get_scale(&self) -> Scale {
self.scale
}
pub fn to_csv(&self) -> String {
let csvs: Vec<String> = self.v
.iter()
.map(|(key, candle)| format!("{},{}", key, candle.to_csv()))
.collect();
csvs.join("\n")
}
fn missing_epochs(&self) -> Vec<Time> {
let mut set = HashSet::<Time>::new();
let mut missing = Vec::<Time>::new();
for &ts in self.v.keys() {
set.insert(ts);
}
let &max_epoch = self.v.keys().next_back().unwrap();
let &min_epoch = self.v.keys().next().unwrap();
let mut it = min_epoch;
while it < max_epoch {
if !set.contains(&it) {
missing.push(it);
}
it += (self.scale as Time) * 60;
}
missing
}
pub fn missing_ranges(&self) -> Vec<(Time, Time)> {
ranges(&self.missing_epochs())
}
pub fn insert_continuation_candles(&mut self) {
let (mut last_ts, mut last_close) = {
let (&last_ts, row) = self.v.iter().next().unwrap(); (last_ts, row.close)
};
let mut temp = IndexMap::<Time, Candle>::new();
for (&ts, row) in self.v.iter() {
if (ts != last_ts + 60) && (last_ts != 0) && (last_ts != ts) {
let mut cur = last_ts + 60;
while cur < ts {
temp.insert(
cur,
Candle {
start: cur,
end: cur + 60,
volume: 0.,
high: last_close,
low: last_close,
open: last_close,
close: last_close,
},
);
cur += 60;
}
}
last_ts = ts;
last_close = row.close;
}
self.v.extend(temp);
}
fn new(v: IndexMap<Time, Candle>, scale: u16) -> TimeBars {
let ret = TimeBars { v, scale };
ret
}
fn _test_epochs_must_be_sequential(&self) -> bool {
let mut i: Time = 0;
let &first = self.v.keys().next().unwrap();
for &row in self.v.keys() {
if first + i * 60 * (self.scale as Time) != row {
return false;
}
i += 1;
}
true
}
pub fn rebin(self, align: bool, new_scale: u16) -> Option<TimeBars> {
if new_scale < self.scale {
return None;
} else if new_scale == self.scale {
return Some(self);
}
let mut res = IndexMap::new();
let mut startacc = 0;
let mut openacc = 0.;
let mut highacc = 0.;
let mut lowacc = 0.;
let mut volumeacc = 0.;
let mut aligned = false;
let mut i = 0;
for (&ts, row) in self.v.iter() {
if align && !aligned {
let snap_point = (ts / (self.scale as Time * 60)) * (self.scale as Time * 60);
if ts == snap_point {
aligned = true;
i = 0;
} else {
continue;
}
}
if i % new_scale as usize == 0 {
startacc = ts;
openacc = row.open;
highacc = row.high;
lowacc = row.low;
volumeacc = row.volume;
i += 1;
continue;
}
highacc = highacc.max(row.high);
lowacc = lowacc.min(row.low);
volumeacc += row.volume;
if (i % (new_scale as usize)) == ((new_scale as usize) - 1) {
let candle = Candle {
start: startacc,
end: ts,
open: openacc,
high: highacc,
low: lowacc,
close: row.close,
volume: volumeacc,
};
res.insert(startacc, candle);
}
i += 1;
}
assert_eq!(res.len(), self.v.len() / (new_scale as usize));
debug_assert!(self._test_epochs_must_be_sequential());
Some(TimeBars {
v: res,
scale: new_scale,
})
}
}
fn ranges(lst: &Vec<Time>) -> Vec<(Time, Time)> {
let mut pos = Vec::new();
for (i, j) in lst.iter().enumerate() {
pos.push(j / 60 - i as Time);
}
let mut ret = Vec::new();
let mut t = 0;
let n_groups = {
if pos.len() == 0 {
vec![]
} else {
let mut n_groups = vec![];
let mut prev = pos[0];
let mut count = 0;
for &num in pos.iter() {
if num != prev {
n_groups.push(count);
count = 0;
} else {
count += 1;
}
prev = num;
}
if n_groups.len() == 0 {
n_groups.push(count);
} else {
n_groups.push(count + 1);
}
n_groups
}
};
for &l in n_groups.iter() {
let el = lst.get(t).unwrap();
t += l;
ret.push((el.clone(), el + 60 * (l - 1) as Time));
}
ret
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_candle_to_csv() {
let inp = Candle {
start: 0,
end: 0,
open: 0.,
close: 0.,
high: 0.,
low: 0.,
volume: 0.,
};
let target = "0,0,0,0,0,0,0";
assert_eq!(inp.to_csv(), target);
}
#[test]
fn test_candle_snap_to_grid() {
let mut v = IndexMap::<Time, Candle>::new();
for i in 30..121 {
let j = 60 * i;
v.insert(
j,
Candle {
start: j,
end: j + 60,
open: 0.,
close: 1.,
high: 2.,
low: 0.,
volume: 1.,
},
);
}
let candles = TimeBars::new(v, 1);
let mut tree = IndexMap::new();
tree.insert(
1800,
Candle {
start: 1800,
end: 5340,
open: 0.,
high: 2.,
low: 0.,
close: 1.,
volume: 60.,
},
);
assert_eq!(
TimeBars { v: tree, scale: 60 },
candles.rebin(true, 60).unwrap()
);
}
#[test]
fn test_create_new_candles() {
assert_eq!(
TimeBars::new(IndexMap::new(), 1),
TimeBars {
v: IndexMap::new(),
scale: 1,
}
);
}
#[test]
fn test_fix_missing_candles() {
let mut v = IndexMap::new();
for i in 30..121 {
if i >= 50 && i <= 60 {
continue;
}
let j = 60 * i;
v.insert(
j,
Candle {
start: j,
end: j + 60,
open: 0.,
close: 1.,
high: 2.,
low: 0.,
volume: 1.,
},
);
}
let mut candles = TimeBars::new(v, 1);
assert_eq!(
vec![
3000,
3060,
3120,
3180,
3240,
3300,
3360,
3420,
3480,
3540,
3600,
],
candles.missing_epochs()
);
assert_eq!(vec![(3000, 3600)], candles.missing_ranges());
candles.insert_continuation_candles();
assert_eq!(Vec::<Time>::new(), candles.missing_epochs());
assert_eq!(Vec::<(Time, Time)>::new(), candles.missing_ranges());
}
#[test]
fn test_ranges() {
let v: Vec<Time> = vec![60, 120, 180, 600, 660, 720];
let result = ranges(&v);
let shouldbe: Vec<(Time, Time)> = vec![(60, 180), (600, 720)];
assert_eq!(shouldbe, result);
let v: Vec<Time> = vec![0, 60, 120, 180, 240, 600, 660, 720];
let result = ranges(&v);
let shouldbe: Vec<(Time, Time)> = vec![(0, 240), (600, 720)];
assert_eq!(shouldbe, result);
}
#[test]
fn test_must_be_sequential() {
let mut candles = IndexMap::new();
for i in 1..10 {
let j = i * 60;
candles.insert(
j,
Candle {
start: j,
end: j + 60,
open: 0.,
close: 0.,
high: 0.,
low: 0.,
volume: 0.,
},
);
}
let c = TimeBars {
v: candles.clone(),
scale: 1,
};
assert!(c._test_epochs_must_be_sequential());
candles.insert(
10000,
Candle {
start: 10000,
end: 18000,
open: 0.,
close: 0.,
high: 0.,
low: 0.,
volume: 0.,
},
);
let g = TimeBars {
v: candles,
scale: 1,
};
assert!(!g._test_epochs_must_be_sequential());
}
#[test]
fn test_rebin() {
let mut candles = IndexMap::new();
let to_scale: usize = 5;
let upto: usize = 5;
for i in 1..(upto + 1) {
let j = i as Time * 60;
candles.insert(
j,
Candle {
start: j,
end: j + 60,
open: 0.,
close: 0.,
high: 0.,
low: 0.,
volume: 0.,
},
);
}
let c = TimeBars {
v: candles.clone(),
scale: 1,
};
println!("{:?}", c);
let rebinned = c.rebin(false, to_scale as u16).unwrap();
println!("{:?}", rebinned);
assert_eq!(rebinned.scale, to_scale as u16);
assert_eq!(rebinned.v.len(), upto / to_scale);
}
#[test]
fn should_have_right_attr() {
use super::super::*;
let mut candles = IndexMap::new();
let to_scale: usize = 5;
let upto: usize = 5;
for i in 1..(upto + 1) {
let j = i as Time * 60;
candles.insert(
j,
Candle {
start: j,
end: j+60,
open: 100. * i as Price,
close: 100. * i as Price,
high: i as Price,
low: i as Price,
volume: i as Price,
},
);
}
let c = TimeBars {
v: candles.clone(),
scale: 1,
};
println!("{:?}", c);
let rebinned = c.rebin(false, to_scale as u16).unwrap();
println!("{:?}", rebinned);
assert_eq!(rebinned.scale, to_scale as u16);
assert_eq!(rebinned.v.len(), upto / to_scale);
let mut i = 1;
for bin in rebinned.v.values() {
println!("{:?}", bin);
assert_eq!(bin.high, (i * to_scale) as Price);
assert_eq!(bin.open, (100 * (i - 1) * to_scale + 100) as Price);
assert_eq!(bin.close, 100. * (i * to_scale) as Price);
assert_eq!(
bin.volume,
(1 + (i - 1) * to_scale..(i * to_scale + 1)).fold(0, |a, b| a + b) as Price
);
i += 1;
}
}
}