binary_options_tools 0.2.0

High-level library for binary options trading automation. Supports PocketOption and ExpertOption with real-time data streaming, WebSocket API access, and automated trading strategies.
Documentation
pub mod market;
pub mod virtual_market;

use crate::framework::market::Market;
use crate::pocketoption::candle::{Candle, SubscriptionType};
use crate::pocketoption::error::PocketResult;
use crate::pocketoption::pocket_client::PocketOption;
use crate::pocketoption::types::Deal;
use async_trait::async_trait;
use futures_util::stream::select_all;
use futures_util::StreamExt;
use rust_decimal::Decimal;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, warn};

/// The Context provides strategies with access to the trading market and other utilities.
#[derive(Clone)]
pub struct Context {
    pub market: Arc<dyn Market>,
    pub client: Arc<PocketOption>,
}

impl Context {
    pub fn new(client: Arc<PocketOption>) -> Self {
        Self {
            market: client.clone(),
            client,
        }
    }
}

/// The Strategy trait defines the interface for trading strategies.
#[async_trait]
pub trait Strategy: Send + Sync {
    /// Called when the bot starts.
    async fn on_start(&self, _ctx: &Context) -> PocketResult<()> {
        Ok(())
    }

    /// Called when a new candle is received.
    async fn on_candle(&self, _ctx: &Context, _asset: &str, _candle: &Candle) -> PocketResult<()> {
        Ok(())
    }

    /// Called when a new tick (price update) is received.
    async fn on_tick(&self, _ctx: &Context, _asset: &str, _price: Decimal) -> PocketResult<()> {
        Ok(())
    }

    /// Called when a new deal is opened.
    async fn on_deal_opened(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
        Ok(())
    }
    
    /// Called when a new deal is closed
    async fn on_deal_closed(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
        Ok(())
    }

    /// Called when the balance changes.
    async fn on_balance_update(&self, _ctx: &Context, _balance: Decimal) -> PocketResult<()> {
        Ok(())
    }
}

/// The Bot manages the execution of a strategy.
pub struct Bot {
    ctx: Context,
    strategy: Arc<Box<dyn Strategy>>,
    assets: Vec<(String, SubscriptionType)>,
    background_tasks: Vec<tokio::task::JoinHandle<()>>,
    update_time: Duration, // Each how much time the task is called
}

impl Bot {
    pub fn new(client: PocketOption, strategy: Box<dyn Strategy>) -> Self {
        Self {
            ctx: Context::new(Arc::new(client)),
            strategy: Arc::new(strategy),
            assets: Vec::new(),
            background_tasks: Vec::new(),
            update_time: Duration::from_secs(5), // Default to 5 seconds
        }
    }
    
    pub fn with_update_interval(&mut self, duration: Duration) {
        self.update_time = duration;
    }

    /// Sets a custom market implementation (e.g., VirtualMarket for backtesting).
    pub fn with_market(mut self, market: Arc<dyn Market>) -> Self {
        self.ctx.market = market;
        self
    }

    /// Adds an asset to monitor with a specific subscription type.
    pub fn add_asset(&mut self, asset: impl Into<String>, sub_type: SubscriptionType) {
        self.assets.push((asset.into(), sub_type));
    }

    /// Starts the bot and its strategy loop.
    pub async fn run(&mut self) -> PocketResult<()> {
        info!("Starting bot...");
        self.strategy.on_start(&self.ctx).await?;
        self.spawn_balance_task();
        
        let mut streams = Vec::new();

        for (asset, sub_type) in &self.assets {
            info!("Subscribing to {}...", asset);
            let stream = self
                .ctx
                .client
                .subscribe(asset.clone(), sub_type.clone())
                .await?;
            streams.push(stream.to_stream().map({
                let asset = asset.clone();
                move |res| (asset.clone(), res)
            }));
        }

        if streams.is_empty() {
            error!("No assets added to the bot. Exiting.");
            return Ok(());
        }

        let mut combined_stream = select_all(streams);

        info!("Bot is now running.");
        while let Some((asset, result)) = combined_stream.next().await {
            match result {
                Ok(candle) => {
                    if let Err(e) = self.strategy.on_candle(&self.ctx, &asset, &candle).await {
                        warn!(target: "Framework", "Strategy on_candle error for {}: {:?}", asset, e);
                    }
                }
                Err(e) => {
                    error!("Stream error for {}: {:?}", asset, e);
                }
            }
        }

        Ok(())
    }
    
    fn spawn_balance_task(&mut self) {
        info!("Spawning balance update task with interval of {:?}...", self.update_time);
        let ctx = self.ctx.clone();
        let strategy = self.strategy.clone();
        let time = self.update_time;
        let mut last_balance = Decimal::ZERO;
        let task = tokio::spawn(async move {
            loop {
                let balance =  ctx.market.balance().await;
                if balance != last_balance {
                    info!("Balance updated: {}", balance);
                    last_balance = balance;
                    if let Err(e) = strategy.on_balance_update(&ctx,balance).await {
                        warn!("Strategy on_balance_update error sharing balance {}: {:?}", balance, e);
                    } 
                }
                tokio::time::sleep(time).await;
            }
            
        });
        self.background_tasks.push(task);
    }
    
    
}