1use std::time::Duration;
2
3use chrono::{DateTime, Utc};
4use rust_decimal::{
5 dec,
6 prelude::{FromPrimitive, ToPrimitive},
7 Decimal,
8};
9use serde::{Deserialize, Serialize};
10use tracing::warn;
11
12use crate::{
13 error::{BinaryOptionsError, BinaryOptionsResult},
14 pocketoption::error::{PocketError, PocketResult},
15};
16
17#[derive(Debug, Clone, Default, Serialize, Deserialize)]
22pub struct Candle {
23 pub symbol: String,
25 pub timestamp: i64,
27 pub open: Decimal,
29 pub high: Decimal,
31 pub low: Decimal,
33 pub close: Decimal,
35 pub volume: Option<Decimal>,
38 }
41
42#[derive(Debug, Default, Clone)]
43pub struct BaseCandle {
52 pub timestamp: i64,
53 pub open: f64,
54 pub close: f64,
55 pub high: f64,
56 pub low: f64,
57 pub volume: Option<f64>,
58}
59
60impl<'de> Deserialize<'de> for BaseCandle {
61 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
62 where
63 D: serde::Deserializer<'de>,
64 {
65 struct BaseCandleVisitor;
66
67 impl<'de> serde::de::Visitor<'de> for BaseCandleVisitor {
68 type Value = BaseCandle;
69
70 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
71 formatter.write_str("a sequence of 5 or 6 elements")
72 }
73
74 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
75 where
76 A: serde::de::SeqAccess<'de>,
77 {
78 let timestamp_raw: f64 = seq
79 .next_element()?
80 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
81 let timestamp = timestamp_raw as i64;
82 let open = seq
83 .next_element()?
84 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
85 let close = seq
86 .next_element()?
87 .ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
88 let high = seq
89 .next_element()?
90 .ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
91 let low = seq
92 .next_element()?
93 .ok_or_else(|| serde::de::Error::invalid_length(4, &self))?;
94 let volume: Option<Option<f64>> = seq.next_element()?;
95 let volume = volume.flatten();
96
97 Ok(BaseCandle {
98 timestamp,
99 open,
100 close,
101 high,
102 low,
103 volume,
104 })
105 }
106 }
107
108 deserializer.deserialize_seq(BaseCandleVisitor)
109 }
110}
111
112#[derive(serde::Deserialize, Debug, Clone)]
113#[serde(untagged)]
114pub enum HistoryItem {
115 Tick([serde_json::Value; 2]),
116 TickWithNull([serde_json::Value; 3]),
117}
118
119impl HistoryItem {
120 pub fn to_tick(&self) -> (i64, f64) {
121 match self {
122 HistoryItem::Tick([t, p]) => (
123 t.as_f64().unwrap_or_default() as i64,
124 p.as_f64().unwrap_or_default(),
125 ),
126 HistoryItem::TickWithNull([t, p, _]) => (
127 t.as_f64().unwrap_or_default() as i64,
128 p.as_f64().unwrap_or_default(),
129 ),
130 }
131 }
132}
133
134#[derive(serde::Deserialize, Debug, Clone)]
135pub struct CandleItem(pub f64, pub f64, pub f64, pub f64, pub f64, pub f64); impl Candle {
138 pub fn new(symbol: String, timestamp: i64, price: f64) -> BinaryOptionsResult<Self> {
148 let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
149 "Couldn't parse f64 to Decimal".to_string(),
150 ))?;
151 Ok(Self {
152 symbol,
153 timestamp,
154 open: price,
155 high: price,
156 low: price,
157 close: price,
158 volume: None, })
161 }
162
163 pub fn update_price(&mut self, price: f64) -> BinaryOptionsResult<()> {
171 let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
172 "Couldn't parse f64 to Decimal".to_string(),
173 ))?;
174 self.high = self.high.max(price);
175 self.low = self.low.min(price);
176 self.close = price;
177 Ok(())
178 }
179
180 pub fn update(&mut self, timestamp: i64, price: f64) -> BinaryOptionsResult<()> {
189 let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
190 "Couldn't parse f64 to Decimal".to_string(),
191 ))?;
192
193 self.high = self.high.max(price);
194 self.low = self.low.min(price);
195 self.close = price;
196 self.timestamp = timestamp;
197 Ok(())
198 }
199
200 pub fn price_range(&self) -> Decimal {
213 self.high - self.low
214 }
215
216 pub fn price_range_f64(&self) -> BinaryOptionsResult<f64> {
217 self.price_range()
218 .to_f64()
219 .ok_or(BinaryOptionsError::ParseDecimal(
220 "Couldn't parse Decimal to f64".to_string(),
221 ))
222 }
223 pub fn is_bullish(&self) -> bool {
228 self.close > self.open
229 }
230
231 pub fn is_bearish(&self) -> bool {
236 self.close < self.open
237 }
238
239 pub fn is_doji(&self) -> bool {
244 let body_size = (self.close - self.open).abs();
245 let range = self.price_range();
246
247 if range > dec!(0.0) {
249 body_size / range < dec!(0.1)
250 } else {
251 true }
253 }
254
255 pub fn body_size(&self) -> Decimal {
260 (self.close - self.open).abs()
261 }
262
263 pub fn body_size_f64(&self) -> BinaryOptionsResult<f64> {
268 self.body_size()
269 .to_f64()
270 .ok_or(BinaryOptionsError::ParseDecimal(
271 "Couldn't parse Decimal to f64".to_string(),
272 ))
273 }
274
275 pub fn upper_shadow(&self) -> Decimal {
280 self.high - self.open.max(self.close)
281 }
282
283 pub fn upper_shadow_f64(&self) -> BinaryOptionsResult<f64> {
288 self.upper_shadow()
289 .to_f64()
290 .ok_or(BinaryOptionsError::ParseDecimal(
291 "Couldn't parse Decimal to f64".to_string(),
292 ))
293 }
294
295 pub fn lower_shadow(&self) -> Decimal {
300 self.open.min(self.close) - self.low
301 }
302
303 pub fn lower_shadow_f64(&self) -> BinaryOptionsResult<f64> {
308 self.lower_shadow()
309 .to_f64()
310 .ok_or(BinaryOptionsError::ParseDecimal(
311 "Couldn't parse Decimal to f64".to_string(),
312 ))
313 }
314
315 pub fn datetime(&self) -> DateTime<Utc> {
320 DateTime::from_timestamp(self.timestamp, 0).unwrap_or_else(Utc::now)
321 }
322}
323
324#[derive(Clone, Debug)]
326pub enum SubscriptionType {
327 None,
328 Chunk {
329 size: usize, current: usize, candle: BaseCandle, },
333 Time {
334 start_time: Option<i64>,
335 duration: Duration,
336 candle: BaseCandle,
337 },
338 TimeAligned {
339 duration: Duration,
340 candle: BaseCandle,
341 next_boundary: Option<i64>,
343 },
344}
345
346impl BaseCandle {
347 pub fn new(
348 timestamp: i64,
349 open: f64,
350 high: f64,
351 low: f64,
352 close: f64,
353 volume: Option<f64>,
354 ) -> Self {
355 Self {
356 timestamp,
357 open,
358 high,
359 low,
360 close,
361 volume, }
363 }
364
365 pub fn timestamp(&self) -> DateTime<Utc> {
366 DateTime::from_timestamp(self.timestamp, 0).unwrap_or_else(Utc::now)
367 }
368}
369
370pub fn compile_candles_from_ticks(ticks: &[HistoryItem], period: u32, symbol: &str) -> Vec<Candle> {
382 if ticks.is_empty() || period == 0 {
383 return Vec::new();
384 }
385
386 let mut candles = Vec::new();
387 let period_i64 = period as i64;
388
389 let mut sorted_ticks: Vec<(i64, f64)> = ticks.iter().map(|t| t.to_tick()).collect();
391 sorted_ticks.sort_by(|a, b| a.0.cmp(&b.0));
392
393 let mut current_candle: Option<BaseCandle> = None;
394 let mut current_boundary_idx: Option<i64> = None;
395
396 for (timestamp, price) in sorted_ticks {
397 let boundary_idx = timestamp / period_i64;
398 let boundary = boundary_idx * period_i64;
399
400 if let Some(mut candle) = current_candle.take() {
401 if Some(boundary_idx) == current_boundary_idx {
402 candle.high = candle.high.max(price);
404 candle.low = candle.low.min(price);
405 candle.close = price;
406 current_candle = Some(candle);
407 } else {
408 match Candle::try_from((candle, symbol.to_string())) {
410 Ok(c) => candles.push(c),
411 Err(e) => warn!("Failed to convert final candle for {}: {}", symbol, e),
412 }
413 current_boundary_idx = Some(boundary_idx);
415 current_candle = Some(BaseCandle {
416 timestamp: boundary,
417 open: price,
418 high: price,
419 low: price,
420 close: price,
421 volume: None,
422 });
423 }
424 } else {
425 current_boundary_idx = Some(boundary_idx);
427 current_candle = Some(BaseCandle {
428 timestamp: boundary,
429 open: price,
430 high: price,
431 low: price,
432 close: price,
433 volume: None,
434 });
435 }
436 }
437
438 if let Some(candle) = current_candle {
439 match Candle::try_from((candle, symbol.to_string())) {
440 Ok(c) => candles.push(c),
441 Err(e) => warn!("Failed to convert final candle for {}: {}", symbol, e),
442 }
443 }
444
445 candles
446}
447
448impl SubscriptionType {
449 pub fn none() -> Self {
450 SubscriptionType::None
451 }
452
453 pub fn chunk(size: usize) -> Self {
454 SubscriptionType::Chunk {
455 size,
456 current: 0,
457 candle: BaseCandle::default(),
458 }
459 }
460
461 pub fn time(duration: Duration) -> Self {
462 SubscriptionType::Time {
463 start_time: None,
464 duration,
465 candle: BaseCandle::default(),
466 }
467 }
468
469 pub fn time_aligned(duration: Duration) -> PocketResult<Self> {
473 if 24 * 60 * 60 % duration.as_secs() != 0 {
474 warn!(
475 "Unsupported duration for time-aligned subscription: {:?}",
476 duration
477 );
478 return Err(PocketError::General(format!(
479 "Unsupported duration for time-aligned subscription: {duration:?}, duration should be a multiple of the number of seconds in a day"
480 )));
481 }
482 Ok(SubscriptionType::TimeAligned {
483 duration,
484 candle: BaseCandle::default(),
485 next_boundary: None,
486 })
487 }
488
489 pub fn period_secs(&self) -> Option<u32> {
490 match self {
491 SubscriptionType::Time { duration, .. } => Some(duration.as_secs() as u32),
492 SubscriptionType::TimeAligned { duration, .. } => Some(duration.as_secs() as u32),
493 _ => None,
494 }
495 }
496
497 pub fn update(&mut self, new_candle: &BaseCandle) -> PocketResult<Option<BaseCandle>> {
498 match self {
499 SubscriptionType::None => Ok(Some(new_candle.clone())),
500
501 SubscriptionType::Chunk {
502 size,
503 current,
504 candle,
505 } => {
506 if *current == 0 {
507 *candle = new_candle.clone();
508 } else {
509 candle.timestamp = new_candle.timestamp;
510 candle.high = candle.high.max(new_candle.high);
511 candle.low = candle.low.min(new_candle.low);
512 candle.close = new_candle.close;
513 }
514 *current += 1;
515
516 if *current >= *size {
517 *current = 0; Ok(Some(candle.clone()))
519 } else {
520 Ok(None)
521 }
522 }
523
524 SubscriptionType::Time {
525 start_time,
526 duration,
527 candle,
528 } => {
529 if start_time.is_none() {
530 *start_time = Some(new_candle.timestamp);
531 *candle = new_candle.clone();
532 return Ok(None);
533 }
534
535 candle.timestamp = new_candle.timestamp;
537 candle.high = candle.high.max(new_candle.high);
538 candle.low = candle.low.min(new_candle.low);
539 candle.close = new_candle.close;
540
541 let elapsed = (new_candle.timestamp()
542 - DateTime::from_timestamp(start_time.unwrap(), 0).unwrap_or_else(Utc::now))
543 .to_std()
544 .map_err(|_| {
545 PocketError::General("Time calculation error in conditional update".to_string())
546 })?;
547
548 if elapsed >= *duration {
549 *start_time = None; Ok(Some(candle.clone()))
551 } else {
552 Ok(None)
553 }
554 }
555
556 SubscriptionType::TimeAligned {
557 duration,
558 candle,
559 next_boundary,
560 } => {
561 let boundary = match *next_boundary {
562 Some(b) => b,
563 None => {
564 *candle = new_candle.clone();
566 let duration_secs = duration.as_secs() as i64;
567 let bucket_id = new_candle.timestamp / duration_secs;
568 let new_boundary = (bucket_id + 1) * duration_secs;
569 *next_boundary = Some(new_boundary);
570
571 return Ok(None);
573 }
574 };
575
576 if new_candle.timestamp < boundary {
577 candle.high = candle.high.max(new_candle.high);
579 candle.low = candle.low.min(new_candle.low);
580 candle.close = new_candle.close;
581 candle.timestamp = new_candle.timestamp;
582 if let (Some(v_agg), Some(v_new)) = (&mut candle.volume, new_candle.volume) {
583 *v_agg += v_new;
584 } else if new_candle.volume.is_some() {
585 candle.volume = new_candle.volume;
586 }
587 Ok(None) } else {
589 let duration_secs = duration.as_secs() as i64;
593 candle.timestamp = boundary - duration_secs;
594 let completed_candle = candle.clone();
596
597 *candle = new_candle.clone();
599
600 let bucket_id = new_candle.timestamp / duration_secs;
602 let new_boundary = (bucket_id + 1) * duration_secs;
603 *next_boundary = Some(new_boundary);
604
605 Ok(Some(completed_candle))
607 }
608 }
609 }
610 }
611}
612
613impl From<(i64, f64)> for BaseCandle {
614 fn from((timestamp, price): (i64, f64)) -> Self {
615 BaseCandle {
616 timestamp,
617 open: price,
618 high: price,
619 low: price,
620 close: price,
621 volume: None, }
623 }
624}
625
626impl TryFrom<(BaseCandle, String)> for Candle {
627 type Error = BinaryOptionsError;
628
629 fn try_from(value: (BaseCandle, String)) -> Result<Self, Self::Error> {
630 let (base_candle, symbol) = value;
631 let volume = match base_candle.volume {
632 Some(v) => Some(
633 Decimal::from_f64(v)
634 .ok_or(BinaryOptionsError::General("Couldn't parse volume".into()))?,
635 ),
636 None => None,
637 };
638 Ok(Candle {
639 symbol,
640 timestamp: base_candle.timestamp,
641 open: Decimal::from_f64(base_candle.open)
642 .ok_or(BinaryOptionsError::General("Couldn't parse open".into()))?,
643 high: Decimal::from_f64(base_candle.high)
644 .ok_or(BinaryOptionsError::General("Couldn't parse high".into()))?,
645 low: Decimal::from_f64(base_candle.low)
646 .ok_or(BinaryOptionsError::General("Couldn't parse low".into()))?,
647 close: Decimal::from_f64(base_candle.close)
648 .ok_or(BinaryOptionsError::General("Couldn't parse close".into()))?,
649 volume,
650 })
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657
658 #[test]
659 fn test_parse_base_candles() {
660 let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124]"#;
662 let candle: BaseCandle = serde_json::from_str(data).unwrap();
663 assert_eq!(candle.timestamp, 1754529180);
664 assert_eq!(candle.open, 0.92124);
665 assert_eq!(candle.close, 0.92155);
666 assert_eq!(candle.high, 0.92162);
667 assert_eq!(candle.low, 0.92124);
668 assert_eq!(candle.volume, None);
669 }
670
671 #[test]
672 fn test_parse_base_candles_with_volume() {
673 let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124,100.0]"#;
675 let candle: BaseCandle = serde_json::from_str(data).unwrap();
676 assert_eq!(candle.volume, Some(100.0));
677 }
678
679 #[test]
680 fn test_parse_base_candles_with_null_volume() {
681 let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124,null]"#;
683 let candle: BaseCandle = serde_json::from_str(data).unwrap();
684 assert_eq!(candle.volume, None);
685 }
686
687 #[test]
688 fn test_compile_candles_zero_period() {
689 let ticks = vec![
690 HistoryItem::Tick([1000.into(), 1.0.into()]),
691 HistoryItem::Tick([1001.into(), 1.1.into()]),
692 ];
693 let candles = compile_candles_from_ticks(&ticks, 0, "TEST");
694 assert!(candles.is_empty());
695 }
696
697 #[test]
698 fn test_compile_candles_empty_ticks() {
699 let ticks = vec![];
700 let candles = compile_candles_from_ticks(&ticks, 60, "TEST");
701 assert!(candles.is_empty());
702 }
703
704 #[test]
705 fn test_compile_candles_single_tick() {
706 let ticks = vec![HistoryItem::Tick([1000.into(), 1.5.into()])];
707 let candles = compile_candles_from_ticks(&ticks, 60, "TEST");
708 assert_eq!(candles.len(), 1);
709 let c = &candles[0];
710 assert_eq!(c.timestamp, 960);
713 assert_eq!(c.open.to_string(), "1.5");
714 assert_eq!(c.high.to_string(), "1.5");
715 assert_eq!(c.low.to_string(), "1.5");
716 assert_eq!(c.close.to_string(), "1.5");
717 }
718}