Skip to main content

binary_options_tools/framework/
mod.rs

1pub mod market;
2pub mod virtual_market;
3
4use crate::framework::market::Market;
5use crate::pocketoption::candle::{Candle, SubscriptionType};
6use crate::pocketoption::error::PocketResult;
7use crate::pocketoption::pocket_client::PocketOption;
8use crate::pocketoption::types::Deal;
9use async_trait::async_trait;
10use futures_util::stream::select_all;
11use futures_util::StreamExt;
12use rust_decimal::Decimal;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::{error, info, warn};
16
17/// The Context provides strategies with access to the trading market and other utilities.
18#[derive(Clone)]
19pub struct Context {
20    pub market: Arc<dyn Market>,
21    pub client: Arc<PocketOption>,
22}
23
24impl Context {
25    pub fn new(client: Arc<PocketOption>) -> Self {
26        Self {
27            market: client.clone(),
28            client,
29        }
30    }
31}
32
33/// The Strategy trait defines the interface for trading strategies.
34#[async_trait]
35pub trait Strategy: Send + Sync {
36    /// Called when the bot starts.
37    async fn on_start(&self, _ctx: &Context) -> PocketResult<()> {
38        Ok(())
39    }
40
41    /// Called when a new candle is received.
42    async fn on_candle(&self, _ctx: &Context, _asset: &str, _candle: &Candle) -> PocketResult<()> {
43        Ok(())
44    }
45
46    /// Called when a new tick (price update) is received.
47    async fn on_tick(&self, _ctx: &Context, _asset: &str, _price: Decimal) -> PocketResult<()> {
48        Ok(())
49    }
50
51    /// Called when a new deal is opened.
52    async fn on_deal_opened(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
53        Ok(())
54    }
55    
56    /// Called when a new deal is closed
57    async fn on_deal_closed(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
58        Ok(())
59    }
60
61    /// Called when the balance changes.
62    async fn on_balance_update(&self, _ctx: &Context, _balance: Decimal) -> PocketResult<()> {
63        Ok(())
64    }
65}
66
67/// The Bot manages the execution of a strategy.
68pub struct Bot {
69    ctx: Context,
70    strategy: Arc<Box<dyn Strategy>>,
71    assets: Vec<(String, SubscriptionType)>,
72    background_tasks: Vec<tokio::task::JoinHandle<()>>,
73    update_time: Duration, // Each how much time the task is called
74}
75
76impl Bot {
77    pub fn new(client: PocketOption, strategy: Box<dyn Strategy>) -> Self {
78        Self {
79            ctx: Context::new(Arc::new(client)),
80            strategy: Arc::new(strategy),
81            assets: Vec::new(),
82            background_tasks: Vec::new(),
83            update_time: Duration::from_secs(5), // Default to 5 seconds
84        }
85    }
86    
87    pub fn with_update_interval(&mut self, duration: Duration) {
88        self.update_time = duration;
89    }
90
91    /// Sets a custom market implementation (e.g., VirtualMarket for backtesting).
92    pub fn with_market(mut self, market: Arc<dyn Market>) -> Self {
93        self.ctx.market = market;
94        self
95    }
96
97    /// Adds an asset to monitor with a specific subscription type.
98    pub fn add_asset(&mut self, asset: impl Into<String>, sub_type: SubscriptionType) {
99        self.assets.push((asset.into(), sub_type));
100    }
101
102    /// Starts the bot and its strategy loop.
103    pub async fn run(&mut self) -> PocketResult<()> {
104        info!("Starting bot...");
105        self.strategy.on_start(&self.ctx).await?;
106        self.spawn_balance_task();
107        
108        let mut streams = Vec::new();
109
110        for (asset, sub_type) in &self.assets {
111            info!("Subscribing to {}...", asset);
112            let stream = self
113                .ctx
114                .client
115                .subscribe(asset.clone(), sub_type.clone())
116                .await?;
117            streams.push(stream.to_stream().map({
118                let asset = asset.clone();
119                move |res| (asset.clone(), res)
120            }));
121        }
122
123        if streams.is_empty() {
124            error!("No assets added to the bot. Exiting.");
125            return Ok(());
126        }
127
128        let mut combined_stream = select_all(streams);
129
130        info!("Bot is now running.");
131        while let Some((asset, result)) = combined_stream.next().await {
132            match result {
133                Ok(candle) => {
134                    if let Err(e) = self.strategy.on_candle(&self.ctx, &asset, &candle).await {
135                        warn!(target: "Framework", "Strategy on_candle error for {}: {:?}", asset, e);
136                    }
137                }
138                Err(e) => {
139                    error!("Stream error for {}: {:?}", asset, e);
140                }
141            }
142        }
143
144        Ok(())
145    }
146    
147    fn spawn_balance_task(&mut self) {
148        info!("Spawning balance update task with interval of {:?}...", self.update_time);
149        let ctx = self.ctx.clone();
150        let strategy = self.strategy.clone();
151        let time = self.update_time;
152        let mut last_balance = Decimal::ZERO;
153        let task = tokio::spawn(async move {
154            loop {
155                let balance =  ctx.market.balance().await;
156                if balance != last_balance {
157                    info!("Balance updated: {}", balance);
158                    last_balance = balance;
159                    if let Err(e) = strategy.on_balance_update(&ctx,balance).await {
160                        warn!("Strategy on_balance_update error sharing balance {}: {:?}", balance, e);
161                    } 
162                }
163                tokio::time::sleep(time).await;
164            }
165            
166        });
167        self.background_tasks.push(task);
168    }
169    
170    
171}