use std::time::Duration;
use chrono::{DateTime, Utc};
use rust_decimal::{
dec,
prelude::{FromPrimitive, ToPrimitive},
Decimal,
};
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::{
error::{BinaryOptionsError, BinaryOptionsResult},
pocketoption::error::{PocketError, PocketResult},
};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Candle {
pub symbol: String,
pub timestamp: i64,
pub open: Decimal,
pub high: Decimal,
pub low: Decimal,
pub close: Decimal,
pub volume: Option<Decimal>,
}
#[derive(Debug, Default, Clone)]
pub struct BaseCandle {
pub timestamp: i64,
pub open: f64,
pub close: f64,
pub high: f64,
pub low: f64,
pub volume: Option<f64>,
}
impl<'de> Deserialize<'de> for BaseCandle {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct BaseCandleVisitor;
impl<'de> serde::de::Visitor<'de> for BaseCandleVisitor {
type Value = BaseCandle;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a sequence of 5 or 6 elements")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let timestamp_raw: f64 = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
let timestamp = timestamp_raw as i64;
let open = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
let close = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
let high = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
let low = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(4, &self))?;
let volume: Option<Option<f64>> = seq.next_element()?;
let volume = volume.flatten();
Ok(BaseCandle {
timestamp,
open,
close,
high,
low,
volume,
})
}
}
deserializer.deserialize_seq(BaseCandleVisitor)
}
}
#[derive(serde::Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum HistoryItem {
Tick([serde_json::Value; 2]),
TickWithNull([serde_json::Value; 3]),
}
impl HistoryItem {
pub fn to_tick(&self) -> (i64, f64) {
match self {
HistoryItem::Tick([t, p]) => (
t.as_f64().unwrap_or_default() as i64,
p.as_f64().unwrap_or_default(),
),
HistoryItem::TickWithNull([t, p, _]) => (
t.as_f64().unwrap_or_default() as i64,
p.as_f64().unwrap_or_default(),
),
}
}
}
#[derive(serde::Deserialize, Debug, Clone)]
pub struct CandleItem(pub f64, pub f64, pub f64, pub f64, pub f64, pub f64);
impl Candle {
pub fn new(symbol: String, timestamp: i64, price: f64) -> BinaryOptionsResult<Self> {
let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
"Couldn't parse f64 to Decimal".to_string(),
))?;
Ok(Self {
symbol,
timestamp,
open: price,
high: price,
low: price,
close: price,
volume: None, })
}
pub fn update_price(&mut self, price: f64) -> BinaryOptionsResult<()> {
let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
"Couldn't parse f64 to Decimal".to_string(),
))?;
self.high = self.high.max(price);
self.low = self.low.min(price);
self.close = price;
Ok(())
}
pub fn update(&mut self, timestamp: i64, price: f64) -> BinaryOptionsResult<()> {
let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
"Couldn't parse f64 to Decimal".to_string(),
))?;
self.high = self.high.max(price);
self.low = self.low.min(price);
self.close = price;
self.timestamp = timestamp;
Ok(())
}
pub fn price_range(&self) -> Decimal {
self.high - self.low
}
pub fn price_range_f64(&self) -> BinaryOptionsResult<f64> {
self.price_range()
.to_f64()
.ok_or(BinaryOptionsError::ParseDecimal(
"Couldn't parse Decimal to f64".to_string(),
))
}
pub fn is_bullish(&self) -> bool {
self.close > self.open
}
pub fn is_bearish(&self) -> bool {
self.close < self.open
}
pub fn is_doji(&self) -> bool {
let body_size = (self.close - self.open).abs();
let range = self.price_range();
if range > dec!(0.0) {
body_size / range < dec!(0.1)
} else {
true }
}
pub fn body_size(&self) -> Decimal {
(self.close - self.open).abs()
}
pub fn body_size_f64(&self) -> BinaryOptionsResult<f64> {
self.body_size()
.to_f64()
.ok_or(BinaryOptionsError::ParseDecimal(
"Couldn't parse Decimal to f64".to_string(),
))
}
pub fn upper_shadow(&self) -> Decimal {
self.high - self.open.max(self.close)
}
pub fn upper_shadow_f64(&self) -> BinaryOptionsResult<f64> {
self.upper_shadow()
.to_f64()
.ok_or(BinaryOptionsError::ParseDecimal(
"Couldn't parse Decimal to f64".to_string(),
))
}
pub fn lower_shadow(&self) -> Decimal {
self.open.min(self.close) - self.low
}
pub fn lower_shadow_f64(&self) -> BinaryOptionsResult<f64> {
self.lower_shadow()
.to_f64()
.ok_or(BinaryOptionsError::ParseDecimal(
"Couldn't parse Decimal to f64".to_string(),
))
}
pub fn datetime(&self) -> DateTime<Utc> {
DateTime::from_timestamp(self.timestamp, 0).unwrap_or_else(Utc::now)
}
}
#[derive(Clone, Debug)]
pub enum SubscriptionType {
None,
Chunk {
size: usize, current: usize, candle: BaseCandle, },
Time {
start_time: Option<i64>,
duration: Duration,
candle: BaseCandle,
},
TimeAligned {
duration: Duration,
candle: BaseCandle,
next_boundary: Option<i64>,
},
}
impl BaseCandle {
pub fn new(
timestamp: i64,
open: f64,
high: f64,
low: f64,
close: f64,
volume: Option<f64>,
) -> Self {
Self {
timestamp,
open,
high,
low,
close,
volume, }
}
pub fn timestamp(&self) -> DateTime<Utc> {
DateTime::from_timestamp(self.timestamp, 0).unwrap_or_else(Utc::now)
}
}
pub fn compile_candles_from_ticks(ticks: &[HistoryItem], period: u32, symbol: &str) -> Vec<Candle> {
if ticks.is_empty() || period == 0 {
return Vec::new();
}
let mut candles = Vec::new();
let period_i64 = period as i64;
let mut sorted_ticks: Vec<(i64, f64)> = ticks.iter().map(|t| t.to_tick()).collect();
sorted_ticks.sort_by(|a, b| a.0.cmp(&b.0));
let mut current_candle: Option<BaseCandle> = None;
let mut current_boundary_idx: Option<i64> = None;
for (timestamp, price) in sorted_ticks {
let boundary_idx = timestamp / period_i64;
let boundary = boundary_idx * period_i64;
if let Some(mut candle) = current_candle.take() {
if Some(boundary_idx) == current_boundary_idx {
candle.high = candle.high.max(price);
candle.low = candle.low.min(price);
candle.close = price;
current_candle = Some(candle);
} else {
match Candle::try_from((candle, symbol.to_string())) {
Ok(c) => candles.push(c),
Err(e) => warn!("Failed to convert final candle for {}: {}", symbol, e),
}
current_boundary_idx = Some(boundary_idx);
current_candle = Some(BaseCandle {
timestamp: boundary,
open: price,
high: price,
low: price,
close: price,
volume: None,
});
}
} else {
current_boundary_idx = Some(boundary_idx);
current_candle = Some(BaseCandle {
timestamp: boundary,
open: price,
high: price,
low: price,
close: price,
volume: None,
});
}
}
if let Some(candle) = current_candle {
match Candle::try_from((candle, symbol.to_string())) {
Ok(c) => candles.push(c),
Err(e) => warn!("Failed to convert final candle for {}: {}", symbol, e),
}
}
candles
}
impl SubscriptionType {
pub fn none() -> Self {
SubscriptionType::None
}
pub fn chunk(size: usize) -> Self {
SubscriptionType::Chunk {
size,
current: 0,
candle: BaseCandle::default(),
}
}
pub fn time(duration: Duration) -> Self {
SubscriptionType::Time {
start_time: None,
duration,
candle: BaseCandle::default(),
}
}
pub fn time_aligned(duration: Duration) -> PocketResult<Self> {
if 24 * 60 * 60 % duration.as_secs() != 0 {
warn!(
"Unsupported duration for time-aligned subscription: {:?}",
duration
);
return Err(PocketError::General(format!(
"Unsupported duration for time-aligned subscription: {duration:?}, duration should be a multiple of the number of seconds in a day"
)));
}
Ok(SubscriptionType::TimeAligned {
duration,
candle: BaseCandle::default(),
next_boundary: None,
})
}
pub fn period_secs(&self) -> Option<u32> {
match self {
SubscriptionType::Time { duration, .. } => Some(duration.as_secs() as u32),
SubscriptionType::TimeAligned { duration, .. } => Some(duration.as_secs() as u32),
_ => None,
}
}
pub fn update(&mut self, new_candle: &BaseCandle) -> PocketResult<Option<BaseCandle>> {
match self {
SubscriptionType::None => Ok(Some(new_candle.clone())),
SubscriptionType::Chunk {
size,
current,
candle,
} => {
if *current == 0 {
*candle = new_candle.clone();
} else {
candle.timestamp = new_candle.timestamp;
candle.high = candle.high.max(new_candle.high);
candle.low = candle.low.min(new_candle.low);
candle.close = new_candle.close;
}
*current += 1;
if *current >= *size {
*current = 0; Ok(Some(candle.clone()))
} else {
Ok(None)
}
}
SubscriptionType::Time {
start_time,
duration,
candle,
} => {
if start_time.is_none() {
*start_time = Some(new_candle.timestamp);
*candle = new_candle.clone();
return Ok(None);
}
candle.timestamp = new_candle.timestamp;
candle.high = candle.high.max(new_candle.high);
candle.low = candle.low.min(new_candle.low);
candle.close = new_candle.close;
let elapsed = (new_candle.timestamp()
- DateTime::from_timestamp(start_time.unwrap(), 0).unwrap_or_else(Utc::now))
.to_std()
.map_err(|_| {
PocketError::General("Time calculation error in conditional update".to_string())
})?;
if elapsed >= *duration {
*start_time = None; Ok(Some(candle.clone()))
} else {
Ok(None)
}
}
SubscriptionType::TimeAligned {
duration,
candle,
next_boundary,
} => {
let boundary = match *next_boundary {
Some(b) => b,
None => {
*candle = new_candle.clone();
let duration_secs = duration.as_secs() as i64;
let bucket_id = new_candle.timestamp / duration_secs;
let new_boundary = (bucket_id + 1) * duration_secs;
*next_boundary = Some(new_boundary);
return Ok(None);
}
};
if new_candle.timestamp < boundary {
candle.high = candle.high.max(new_candle.high);
candle.low = candle.low.min(new_candle.low);
candle.close = new_candle.close;
candle.timestamp = new_candle.timestamp;
if let (Some(v_agg), Some(v_new)) = (&mut candle.volume, new_candle.volume) {
*v_agg += v_new;
} else if new_candle.volume.is_some() {
candle.volume = new_candle.volume;
}
Ok(None) } else {
let duration_secs = duration.as_secs() as i64;
candle.timestamp = boundary - duration_secs;
let completed_candle = candle.clone();
*candle = new_candle.clone();
let bucket_id = new_candle.timestamp / duration_secs;
let new_boundary = (bucket_id + 1) * duration_secs;
*next_boundary = Some(new_boundary);
Ok(Some(completed_candle))
}
}
}
}
}
impl From<(i64, f64)> for BaseCandle {
fn from((timestamp, price): (i64, f64)) -> Self {
BaseCandle {
timestamp,
open: price,
high: price,
low: price,
close: price,
volume: None, }
}
}
impl TryFrom<(BaseCandle, String)> for Candle {
type Error = BinaryOptionsError;
fn try_from(value: (BaseCandle, String)) -> Result<Self, Self::Error> {
let (base_candle, symbol) = value;
let volume = match base_candle.volume {
Some(v) => Some(
Decimal::from_f64(v)
.ok_or(BinaryOptionsError::General("Couldn't parse volume".into()))?,
),
None => None,
};
Ok(Candle {
symbol,
timestamp: base_candle.timestamp,
open: Decimal::from_f64(base_candle.open)
.ok_or(BinaryOptionsError::General("Couldn't parse open".into()))?,
high: Decimal::from_f64(base_candle.high)
.ok_or(BinaryOptionsError::General("Couldn't parse high".into()))?,
low: Decimal::from_f64(base_candle.low)
.ok_or(BinaryOptionsError::General("Couldn't parse low".into()))?,
close: Decimal::from_f64(base_candle.close)
.ok_or(BinaryOptionsError::General("Couldn't parse close".into()))?,
volume,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_base_candles() {
let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124]"#;
let candle: BaseCandle = serde_json::from_str(data).unwrap();
assert_eq!(candle.timestamp, 1754529180);
assert_eq!(candle.open, 0.92124);
assert_eq!(candle.close, 0.92155);
assert_eq!(candle.high, 0.92162);
assert_eq!(candle.low, 0.92124);
assert_eq!(candle.volume, None);
}
#[test]
fn test_parse_base_candles_with_volume() {
let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124,100.0]"#;
let candle: BaseCandle = serde_json::from_str(data).unwrap();
assert_eq!(candle.volume, Some(100.0));
}
#[test]
fn test_parse_base_candles_with_null_volume() {
let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124,null]"#;
let candle: BaseCandle = serde_json::from_str(data).unwrap();
assert_eq!(candle.volume, None);
}
#[test]
fn test_compile_candles_zero_period() {
let ticks = vec![
HistoryItem::Tick([1000.into(), 1.0.into()]),
HistoryItem::Tick([1001.into(), 1.1.into()]),
];
let candles = compile_candles_from_ticks(&ticks, 0, "TEST");
assert!(candles.is_empty());
}
#[test]
fn test_compile_candles_empty_ticks() {
let ticks = vec![];
let candles = compile_candles_from_ticks(&ticks, 60, "TEST");
assert!(candles.is_empty());
}
#[test]
fn test_compile_candles_single_tick() {
let ticks = vec![HistoryItem::Tick([1000.into(), 1.5.into()])];
let candles = compile_candles_from_ticks(&ticks, 60, "TEST");
assert_eq!(candles.len(), 1);
let c = &candles[0];
assert_eq!(c.timestamp, 960);
assert_eq!(c.open.to_string(), "1.5");
assert_eq!(c.high.to_string(), "1.5");
assert_eq!(c.low.to_string(), "1.5");
assert_eq!(c.close.to_string(), "1.5");
}
}