Skip to main content

binary_options_tools/pocketoption/
candle.rs

1use std::time::Duration;
2
3use chrono::{DateTime, Utc};
4use rust_decimal::{
5    dec,
6    prelude::{FromPrimitive, ToPrimitive},
7    Decimal,
8};
9use serde::{Deserialize, Serialize};
10use tracing::warn;
11
12use crate::{
13    error::{BinaryOptionsError, BinaryOptionsResult},
14    pocketoption::error::{PocketError, PocketResult},
15};
16
17/// Candle data structure for PocketOption price data
18///
19/// This represents OHLC (Open, High, Low, Close) price data for a specific time period.
20/// Note: PocketOption doesn't provide volume data, so the volume field is always None.
21#[derive(Debug, Clone, Default, Serialize, Deserialize)]
22pub struct Candle {
23    /// Trading symbol (e.g., "EURUSD_otc")
24    pub symbol: String,
25    /// Unix timestamp of the candle start time
26    pub timestamp: i64,
27    /// Opening price
28    pub open: Decimal,
29    /// Highest price in the candle period
30    pub high: Decimal,
31    /// Lowest price in the candle period
32    pub low: Decimal,
33    /// Closing price
34    pub close: Decimal,
35    /// Volume is not provided by PocketOption
36    // #[serde(skip_serializing_if = "Option::is_none")]
37    pub volume: Option<Decimal>,
38    // /// Whether this candle is closed/finalized
39    // pub is_closed: bool,
40}
41
42#[derive(Debug, Default, Clone)]
43/// Base candle structure matching the server's data format.
44///
45/// The field order matches the server's JSON array format: `[timestamp, open, close, high, low]`.
46///
47/// # Example JSON
48/// ```json
49/// [1754529180, 0.92124, 0.92155, 0.92162, 0.92124]
50/// ```
51pub struct BaseCandle {
52    pub timestamp: i64,
53    pub open: f64,
54    pub close: f64,
55    pub high: f64,
56    pub low: f64,
57    pub volume: Option<f64>,
58}
59
60impl<'de> Deserialize<'de> for BaseCandle {
61    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
62    where
63        D: serde::Deserializer<'de>,
64    {
65        struct BaseCandleVisitor;
66
67        impl<'de> serde::de::Visitor<'de> for BaseCandleVisitor {
68            type Value = BaseCandle;
69
70            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
71                formatter.write_str("a sequence of 5 or 6 elements")
72            }
73
74            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
75            where
76                A: serde::de::SeqAccess<'de>,
77            {
78                let timestamp_raw: f64 = seq
79                    .next_element()?
80                    .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
81                let timestamp = timestamp_raw as i64;
82                let open = seq
83                    .next_element()?
84                    .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
85                let close = seq
86                    .next_element()?
87                    .ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
88                let high = seq
89                    .next_element()?
90                    .ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
91                let low = seq
92                    .next_element()?
93                    .ok_or_else(|| serde::de::Error::invalid_length(4, &self))?;
94                let volume: Option<Option<f64>> = seq.next_element()?;
95                let volume = volume.flatten();
96
97                Ok(BaseCandle {
98                    timestamp,
99                    open,
100                    close,
101                    high,
102                    low,
103                    volume,
104                })
105            }
106        }
107
108        deserializer.deserialize_seq(BaseCandleVisitor)
109    }
110}
111
112#[derive(serde::Deserialize, Debug, Clone)]
113#[serde(untagged)]
114pub enum HistoryItem {
115    Tick([serde_json::Value; 2]),
116    TickWithNull([serde_json::Value; 3]),
117}
118
119impl HistoryItem {
120    pub fn to_tick(&self) -> (i64, f64) {
121        match self {
122            HistoryItem::Tick([t, p]) => (
123                t.as_f64().unwrap_or_default() as i64,
124                p.as_f64().unwrap_or_default(),
125            ),
126            HistoryItem::TickWithNull([t, p, _]) => (
127                t.as_f64().unwrap_or_default() as i64,
128                p.as_f64().unwrap_or_default(),
129            ),
130        }
131    }
132}
133
134#[derive(serde::Deserialize, Debug, Clone)]
135pub struct CandleItem(pub f64, pub f64, pub f64, pub f64, pub f64, pub f64); // timestamp, open, close, high, low, volume
136
137impl Candle {
138    /// Create a new candle with initial price
139    ///
140    /// # Arguments
141    /// * `symbol` - Trading symbol
142    /// * `timestamp` - Unix timestamp for the candle start
143    /// * `price` - Initial price (used for open, high, low, close)
144    ///
145    /// # Returns
146    /// New Candle instance with all OHLC values set to the initial price
147    pub fn new(symbol: String, timestamp: i64, price: f64) -> BinaryOptionsResult<Self> {
148        let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
149            "Couldn't parse f64 to Decimal".to_string(),
150        ))?;
151        Ok(Self {
152            symbol,
153            timestamp,
154            open: price,
155            high: price,
156            low: price,
157            close: price,
158            volume: None, // PocketOption doesn't provide volume
159                          // is_closed: false,
160        })
161    }
162
163    /// Update the candle with a new price
164    ///
165    /// This method updates the high, low, and close prices while maintaining
166    /// the open price from the initial candle creation.
167    ///
168    /// # Arguments
169    /// * `price` - New price to incorporate into the candle
170    pub fn update_price(&mut self, price: f64) -> BinaryOptionsResult<()> {
171        let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
172            "Couldn't parse f64 to Decimal".to_string(),
173        ))?;
174        self.high = self.high.max(price);
175        self.low = self.low.min(price);
176        self.close = price;
177        Ok(())
178    }
179
180    /// Update the candle with a new timestamp and price
181    ///
182    /// This method updates the high, low, and close prices while maintaining
183    /// the open price from the initial candle creation.
184    ///
185    /// # Arguments
186    /// * `timestamp` - New timestamp for the candle
187    /// * `price` - New price to incorporate into the candle
188    pub fn update(&mut self, timestamp: i64, price: f64) -> BinaryOptionsResult<()> {
189        let price = Decimal::from_f64(price).ok_or(BinaryOptionsError::General(
190            "Couldn't parse f64 to Decimal".to_string(),
191        ))?;
192
193        self.high = self.high.max(price);
194        self.low = self.low.min(price);
195        self.close = price;
196        self.timestamp = timestamp;
197        Ok(())
198    }
199
200    // /// Mark the candle as closed/finalized
201    // ///
202    // /// Once a candle is closed, it should not be updated with new prices.
203    // /// This is typically called when a time-based candle period ends.
204    // pub fn close_candle(&mut self) {
205    //     self.is_closed = true;
206    // }
207
208    /// Get the price range (high - low) of the candle
209    ///
210    /// # Returns
211    /// Price range as Decimal
212    pub fn price_range(&self) -> Decimal {
213        self.high - self.low
214    }
215
216    pub fn price_range_f64(&self) -> BinaryOptionsResult<f64> {
217        self.price_range()
218            .to_f64()
219            .ok_or(BinaryOptionsError::ParseDecimal(
220                "Couldn't parse Decimal to f64".to_string(),
221            ))
222    }
223    /// Check if the candle is bullish (close > open)
224    ///
225    /// # Returns
226    /// True if the candle closed higher than it opened
227    pub fn is_bullish(&self) -> bool {
228        self.close > self.open
229    }
230
231    /// Check if the candle is bearish (close < open)
232    ///
233    /// # Returns
234    /// True if the candle closed lower than it opened
235    pub fn is_bearish(&self) -> bool {
236        self.close < self.open
237    }
238
239    /// Check if the candle is a doji (close ≈ open)
240    ///
241    /// # Returns
242    /// True if the candle has very little price movement
243    pub fn is_doji(&self) -> bool {
244        let body_size = (self.close - self.open).abs();
245        let range = self.price_range();
246
247        // Consider it a doji if the body is less than 10% of the range
248        if range > dec!(0.0) {
249            body_size / range < dec!(0.1)
250        } else {
251            true // No price movement at all
252        }
253    }
254
255    /// Get the body size of the candle (absolute difference between open and close)
256    ///
257    /// # Returns
258    /// Body size as Decimal
259    pub fn body_size(&self) -> Decimal {
260        (self.close - self.open).abs()
261    }
262
263    /// Get the body size of the candle (absolute difference between open and close)
264    ///
265    /// # Returns
266    /// Body size as f64
267    pub fn body_size_f64(&self) -> BinaryOptionsResult<f64> {
268        self.body_size()
269            .to_f64()
270            .ok_or(BinaryOptionsError::ParseDecimal(
271                "Couldn't parse Decimal to f64".to_string(),
272            ))
273    }
274
275    /// Get the upper shadow length
276    ///
277    /// # Returns
278    /// Upper shadow length as Decimal
279    pub fn upper_shadow(&self) -> Decimal {
280        self.high - self.open.max(self.close)
281    }
282
283    /// Get the upper shadow length
284    ///
285    /// # Returns
286    /// Upper shadow length as f64
287    pub fn upper_shadow_f64(&self) -> BinaryOptionsResult<f64> {
288        self.upper_shadow()
289            .to_f64()
290            .ok_or(BinaryOptionsError::ParseDecimal(
291                "Couldn't parse Decimal to f64".to_string(),
292            ))
293    }
294
295    /// Get the lower shadow length
296    ///
297    /// # Returns
298    /// Lower shadow length as Decimal
299    pub fn lower_shadow(&self) -> Decimal {
300        self.open.min(self.close) - self.low
301    }
302
303    /// Get the lower shadow length
304    ///
305    /// # Returns
306    /// Lower shadow length as f64
307    pub fn lower_shadow_f64(&self) -> BinaryOptionsResult<f64> {
308        self.lower_shadow()
309            .to_f64()
310            .ok_or(BinaryOptionsError::ParseDecimal(
311                "Couldn't parse Decimal to f64".to_string(),
312            ))
313    }
314
315    /// Convert timestamp to `DateTime<Utc>`
316    ///
317    /// # Returns
318    /// `DateTime<Utc>` representation of the candle timestamp
319    pub fn datetime(&self) -> DateTime<Utc> {
320        DateTime::from_timestamp(self.timestamp, 0).unwrap_or_else(Utc::now)
321    }
322}
323
324/// Represents the type of subscription for candle data.
325#[derive(Clone, Debug)]
326pub enum SubscriptionType {
327    None,
328    Chunk {
329        size: usize,        // Number of candles to aggregate
330        current: usize,     // Current aggregated candle count
331        candle: BaseCandle, // Current aggregated candle
332    },
333    Time {
334        start_time: Option<i64>,
335        duration: Duration,
336        candle: BaseCandle,
337    },
338    TimeAligned {
339        duration: Duration,
340        candle: BaseCandle,
341        /// Stores the timestamp for the end of the current aggregation window.
342        next_boundary: Option<i64>,
343    },
344}
345
346impl BaseCandle {
347    pub fn new(
348        timestamp: i64,
349        open: f64,
350        high: f64,
351        low: f64,
352        close: f64,
353        volume: Option<f64>,
354    ) -> Self {
355        Self {
356            timestamp,
357            open,
358            high,
359            low,
360            close,
361            volume, // PocketOption doesn't provide volume
362        }
363    }
364
365    pub fn timestamp(&self) -> DateTime<Utc> {
366        DateTime::from_timestamp(self.timestamp, 0).unwrap_or_else(Utc::now)
367    }
368}
369
370/// Compiles raw tick data into candles based on the specified period.
371///
372/// # Arguments
373/// * `ticks` - Slice of history items (ticks)
374/// * `period` - Time period in seconds for each candle. Must be greater than 0.
375/// * `symbol` - Trading symbol
376///
377/// # Returns
378/// Vector of compiled Candles. Returns an empty vector if:
379/// * `ticks` is empty
380/// * `period` is 0 (to avoid division by zero)
381pub fn compile_candles_from_ticks(ticks: &[HistoryItem], period: u32, symbol: &str) -> Vec<Candle> {
382    if ticks.is_empty() || period == 0 {
383        return Vec::new();
384    }
385
386    let mut candles = Vec::new();
387    let period_i64 = period as i64;
388
389    // Sort ticks by timestamp just in case
390    let mut sorted_ticks: Vec<(i64, f64)> = ticks.iter().map(|t| t.to_tick()).collect();
391    sorted_ticks.sort_by(|a, b| a.0.cmp(&b.0));
392
393    let mut current_candle: Option<BaseCandle> = None;
394    let mut current_boundary_idx: Option<i64> = None;
395
396    for (timestamp, price) in sorted_ticks {
397        let boundary_idx = timestamp / period_i64;
398        let boundary = boundary_idx * period_i64;
399
400        if let Some(mut candle) = current_candle.take() {
401            if Some(boundary_idx) == current_boundary_idx {
402                // Same candle
403                candle.high = candle.high.max(price);
404                candle.low = candle.low.min(price);
405                candle.close = price;
406                current_candle = Some(candle);
407            } else {
408                // New candle, push old one
409                match Candle::try_from((candle, symbol.to_string())) {
410                    Ok(c) => candles.push(c),
411                    Err(e) => warn!("Failed to convert final candle for {}: {}", symbol, e),
412                }
413                // Start new candle
414                current_boundary_idx = Some(boundary_idx);
415                current_candle = Some(BaseCandle {
416                    timestamp: boundary,
417                    open: price,
418                    high: price,
419                    low: price,
420                    close: price,
421                    volume: None,
422                });
423            }
424        } else {
425            // First tick
426            current_boundary_idx = Some(boundary_idx);
427            current_candle = Some(BaseCandle {
428                timestamp: boundary,
429                open: price,
430                high: price,
431                low: price,
432                close: price,
433                volume: None,
434            });
435        }
436    }
437
438    if let Some(candle) = current_candle {
439        match Candle::try_from((candle, symbol.to_string())) {
440            Ok(c) => candles.push(c),
441            Err(e) => warn!("Failed to convert final candle for {}: {}", symbol, e),
442        }
443    }
444
445    candles
446}
447
448impl SubscriptionType {
449    pub fn none() -> Self {
450        SubscriptionType::None
451    }
452
453    pub fn chunk(size: usize) -> Self {
454        SubscriptionType::Chunk {
455            size,
456            current: 0,
457            candle: BaseCandle::default(),
458        }
459    }
460
461    pub fn time(duration: Duration) -> Self {
462        SubscriptionType::Time {
463            start_time: None,
464            duration,
465            candle: BaseCandle::default(),
466        }
467    }
468
469    /// Creates a time-aligned subscription.
470    ///
471    /// Completed candle timestamps are set to the boundary start time (the beginning of the aggregation window).
472    pub fn time_aligned(duration: Duration) -> PocketResult<Self> {
473        if 24 * 60 * 60 % duration.as_secs() != 0 {
474            warn!(
475                "Unsupported duration for time-aligned subscription: {:?}",
476                duration
477            );
478            return Err(PocketError::General(format!(
479                "Unsupported duration for time-aligned subscription: {duration:?}, duration should be a multiple of the number of seconds in a day"
480            )));
481        }
482        Ok(SubscriptionType::TimeAligned {
483            duration,
484            candle: BaseCandle::default(),
485            next_boundary: None,
486        })
487    }
488
489    pub fn period_secs(&self) -> Option<u32> {
490        match self {
491            SubscriptionType::Time { duration, .. } => Some(duration.as_secs() as u32),
492            SubscriptionType::TimeAligned { duration, .. } => Some(duration.as_secs() as u32),
493            _ => None,
494        }
495    }
496
497    pub fn update(&mut self, new_candle: &BaseCandle) -> PocketResult<Option<BaseCandle>> {
498        match self {
499            SubscriptionType::None => Ok(Some(new_candle.clone())),
500
501            SubscriptionType::Chunk {
502                size,
503                current,
504                candle,
505            } => {
506                if *current == 0 {
507                    *candle = new_candle.clone();
508                } else {
509                    candle.timestamp = new_candle.timestamp;
510                    candle.high = candle.high.max(new_candle.high);
511                    candle.low = candle.low.min(new_candle.low);
512                    candle.close = new_candle.close;
513                }
514                *current += 1;
515
516                if *current >= *size {
517                    *current = 0; // Reset for next batch
518                    Ok(Some(candle.clone()))
519                } else {
520                    Ok(None)
521                }
522            }
523
524            SubscriptionType::Time {
525                start_time,
526                duration,
527                candle,
528            } => {
529                if start_time.is_none() {
530                    *start_time = Some(new_candle.timestamp);
531                    *candle = new_candle.clone();
532                    return Ok(None);
533                }
534
535                // Update the aggregated candle
536                candle.timestamp = new_candle.timestamp;
537                candle.high = candle.high.max(new_candle.high);
538                candle.low = candle.low.min(new_candle.low);
539                candle.close = new_candle.close;
540
541                let elapsed = (new_candle.timestamp()
542                    - DateTime::from_timestamp(start_time.unwrap(), 0).unwrap_or_else(Utc::now))
543                .to_std()
544                .map_err(|_| {
545                    PocketError::General("Time calculation error in conditional update".to_string())
546                })?;
547
548                if elapsed >= *duration {
549                    *start_time = None; // Reset for next period
550                    Ok(Some(candle.clone()))
551                } else {
552                    Ok(None)
553                }
554            }
555
556            SubscriptionType::TimeAligned {
557                duration,
558                candle,
559                next_boundary,
560            } => {
561                let boundary = match *next_boundary {
562                    Some(b) => b,
563                    None => {
564                        // First candle ever processed. Initialize the state.
565                        *candle = new_candle.clone();
566                        let duration_secs = duration.as_secs() as i64;
567                        let bucket_id = new_candle.timestamp / duration_secs;
568                        let new_boundary = (bucket_id + 1) * duration_secs;
569                        *next_boundary = Some(new_boundary);
570
571                        // It's the first candle, so the window can't be complete yet.
572                        return Ok(None);
573                    }
574                };
575
576                if new_candle.timestamp < boundary {
577                    // The new candle is within the current time window. Aggregate its data.
578                    candle.high = candle.high.max(new_candle.high);
579                    candle.low = candle.low.min(new_candle.low);
580                    candle.close = new_candle.close;
581                    candle.timestamp = new_candle.timestamp;
582                    if let (Some(v_agg), Some(v_new)) = (&mut candle.volume, new_candle.volume) {
583                        *v_agg += v_new;
584                    } else if new_candle.volume.is_some() {
585                        candle.volume = new_candle.volume;
586                    }
587                    Ok(None) // The candle is not yet complete.
588                } else {
589                    // The new candle's timestamp is at or after the boundary.
590                    // The current aggregation window is now complete.
591                    // Set timestamp to the start of the period (boundary - duration)
592                    let duration_secs = duration.as_secs() as i64;
593                    candle.timestamp = boundary - duration_secs;
594                    // 1. Clone the completed candle to return it later.
595                    let completed_candle = candle.clone();
596
597                    // 2. Start the new aggregation period with the new_candle's data.
598                    *candle = new_candle.clone();
599
600                    // 3. Calculate the boundary for this new period.
601                    let bucket_id = new_candle.timestamp / duration_secs;
602                    let new_boundary = (bucket_id + 1) * duration_secs;
603                    *next_boundary = Some(new_boundary);
604
605                    // 4. Return the candle that was just completed.
606                    Ok(Some(completed_candle))
607                }
608            }
609        }
610    }
611}
612
613impl From<(i64, f64)> for BaseCandle {
614    fn from((timestamp, price): (i64, f64)) -> Self {
615        BaseCandle {
616            timestamp,
617            open: price,
618            high: price,
619            low: price,
620            close: price,
621            volume: None, // PocketOption doesn't provide volume
622        }
623    }
624}
625
626impl TryFrom<(BaseCandle, String)> for Candle {
627    type Error = BinaryOptionsError;
628
629    fn try_from(value: (BaseCandle, String)) -> Result<Self, Self::Error> {
630        let (base_candle, symbol) = value;
631        let volume = match base_candle.volume {
632            Some(v) => Some(
633                Decimal::from_f64(v)
634                    .ok_or(BinaryOptionsError::General("Couldn't parse volume".into()))?,
635            ),
636            None => None,
637        };
638        Ok(Candle {
639            symbol,
640            timestamp: base_candle.timestamp,
641            open: Decimal::from_f64(base_candle.open)
642                .ok_or(BinaryOptionsError::General("Couldn't parse open".into()))?,
643            high: Decimal::from_f64(base_candle.high)
644                .ok_or(BinaryOptionsError::General("Couldn't parse high".into()))?,
645            low: Decimal::from_f64(base_candle.low)
646                .ok_or(BinaryOptionsError::General("Couldn't parse low".into()))?,
647            close: Decimal::from_f64(base_candle.close)
648                .ok_or(BinaryOptionsError::General("Couldn't parse close".into()))?,
649            volume,
650        })
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use super::*;
657
658    #[test]
659    fn test_parse_base_candles() {
660        // Format: [timestamp, open, close, high, low]
661        let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124]"#;
662        let candle: BaseCandle = serde_json::from_str(data).unwrap();
663        assert_eq!(candle.timestamp, 1754529180);
664        assert_eq!(candle.open, 0.92124);
665        assert_eq!(candle.close, 0.92155);
666        assert_eq!(candle.high, 0.92162);
667        assert_eq!(candle.low, 0.92124);
668        assert_eq!(candle.volume, None);
669    }
670
671    #[test]
672    fn test_parse_base_candles_with_volume() {
673        // Format: [timestamp, open, close, high, low, volume]
674        let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124,100.0]"#;
675        let candle: BaseCandle = serde_json::from_str(data).unwrap();
676        assert_eq!(candle.volume, Some(100.0));
677    }
678
679    #[test]
680    fn test_parse_base_candles_with_null_volume() {
681        // Format: [timestamp, open, close, high, low, null]
682        let data = r#"[1754529180,0.92124,0.92155,0.92162,0.92124,null]"#;
683        let candle: BaseCandle = serde_json::from_str(data).unwrap();
684        assert_eq!(candle.volume, None);
685    }
686
687    #[test]
688    fn test_compile_candles_zero_period() {
689        let ticks = vec![
690            HistoryItem::Tick([1000.into(), 1.0.into()]),
691            HistoryItem::Tick([1001.into(), 1.1.into()]),
692        ];
693        let candles = compile_candles_from_ticks(&ticks, 0, "TEST");
694        assert!(candles.is_empty());
695    }
696
697    #[test]
698    fn test_compile_candles_empty_ticks() {
699        let ticks = vec![];
700        let candles = compile_candles_from_ticks(&ticks, 60, "TEST");
701        assert!(candles.is_empty());
702    }
703
704    #[test]
705    fn test_compile_candles_single_tick() {
706        let ticks = vec![HistoryItem::Tick([1000.into(), 1.5.into()])];
707        let candles = compile_candles_from_ticks(&ticks, 60, "TEST");
708        assert_eq!(candles.len(), 1);
709        let c = &candles[0];
710        // 1000 / 60 = 16.66.. -> floor 16. 16 * 60 = 960.
711        // So timestamp should be 960.
712        assert_eq!(c.timestamp, 960);
713        assert_eq!(c.open.to_string(), "1.5");
714        assert_eq!(c.high.to_string(), "1.5");
715        assert_eq!(c.low.to_string(), "1.5");
716        assert_eq!(c.close.to_string(), "1.5");
717    }
718}