binary_options_tools/framework/
mod.rs1pub mod market;
2pub mod virtual_market;
3
4use crate::framework::market::Market;
5use crate::pocketoption::candle::{Candle, SubscriptionType};
6use crate::pocketoption::error::PocketResult;
7use crate::pocketoption::pocket_client::PocketOption;
8use crate::pocketoption::types::Deal;
9use async_trait::async_trait;
10use futures_util::stream::select_all;
11use futures_util::StreamExt;
12use rust_decimal::Decimal;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::{error, info, warn};
16
17#[derive(Clone)]
19pub struct Context {
20 pub market: Arc<dyn Market>,
21 pub client: Arc<PocketOption>,
22}
23
24impl Context {
25 pub fn new(client: Arc<PocketOption>) -> Self {
26 Self {
27 market: client.clone(),
28 client,
29 }
30 }
31}
32
33#[async_trait]
35pub trait Strategy: Send + Sync {
36 async fn on_start(&self, _ctx: &Context) -> PocketResult<()> {
38 Ok(())
39 }
40
41 async fn on_candle(&self, _ctx: &Context, _asset: &str, _candle: &Candle) -> PocketResult<()> {
43 Ok(())
44 }
45
46 async fn on_tick(&self, _ctx: &Context, _asset: &str, _price: Decimal) -> PocketResult<()> {
48 Ok(())
49 }
50
51 async fn on_deal_opened(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
53 Ok(())
54 }
55
56 async fn on_deal_closed(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
58 Ok(())
59 }
60
61 async fn on_balance_update(&self, _ctx: &Context, _balance: Decimal) -> PocketResult<()> {
63 Ok(())
64 }
65}
66
67pub struct Bot {
69 ctx: Context,
70 strategy: Arc<Box<dyn Strategy>>,
71 assets: Vec<(String, SubscriptionType)>,
72 background_tasks: Vec<tokio::task::JoinHandle<()>>,
73 update_time: Duration, }
75
76impl Bot {
77 pub fn new(client: PocketOption, strategy: Box<dyn Strategy>) -> Self {
78 Self {
79 ctx: Context::new(Arc::new(client)),
80 strategy: Arc::new(strategy),
81 assets: Vec::new(),
82 background_tasks: Vec::new(),
83 update_time: Duration::from_secs(5), }
85 }
86
87 pub fn with_update_interval(&mut self, duration: Duration) {
88 self.update_time = duration;
89 }
90
91 pub fn with_market(mut self, market: Arc<dyn Market>) -> Self {
93 self.ctx.market = market;
94 self
95 }
96
97 pub fn add_asset(&mut self, asset: impl Into<String>, sub_type: SubscriptionType) {
99 self.assets.push((asset.into(), sub_type));
100 }
101
102 pub async fn run(&mut self) -> PocketResult<()> {
104 info!("Starting bot...");
105 self.strategy.on_start(&self.ctx).await?;
106 self.spawn_balance_task();
107
108 let mut streams = Vec::new();
109
110 for (asset, sub_type) in &self.assets {
111 info!("Subscribing to {}...", asset);
112 let stream = self
113 .ctx
114 .client
115 .subscribe(asset.clone(), sub_type.clone())
116 .await?;
117 streams.push(stream.to_stream().map({
118 let asset = asset.clone();
119 move |res| (asset.clone(), res)
120 }));
121 }
122
123 if streams.is_empty() {
124 error!("No assets added to the bot. Exiting.");
125 return Ok(());
126 }
127
128 let mut combined_stream = select_all(streams);
129
130 info!("Bot is now running.");
131 while let Some((asset, result)) = combined_stream.next().await {
132 match result {
133 Ok(candle) => {
134 if let Err(e) = self.strategy.on_candle(&self.ctx, &asset, &candle).await {
135 warn!(target: "Framework", "Strategy on_candle error for {}: {:?}", asset, e);
136 }
137 }
138 Err(e) => {
139 error!("Stream error for {}: {:?}", asset, e);
140 }
141 }
142 }
143
144 Ok(())
145 }
146
147 fn spawn_balance_task(&mut self) {
148 info!("Spawning balance update task with interval of {:?}...", self.update_time);
149 let ctx = self.ctx.clone();
150 let strategy = self.strategy.clone();
151 let time = self.update_time;
152 let mut last_balance = Decimal::ZERO;
153 let task = tokio::spawn(async move {
154 loop {
155 let balance = ctx.market.balance().await;
156 if balance != last_balance {
157 info!("Balance updated: {}", balance);
158 last_balance = balance;
159 if let Err(e) = strategy.on_balance_update(&ctx,balance).await {
160 warn!("Strategy on_balance_update error sharing balance {}: {:?}", balance, e);
161 }
162 }
163 tokio::time::sleep(time).await;
164 }
165
166 });
167 self.background_tasks.push(task);
168 }
169
170
171}