pub mod market;
pub mod virtual_market;
use crate::framework::market::Market;
use crate::pocketoption::candle::{Candle, SubscriptionType};
use crate::pocketoption::error::PocketResult;
use crate::pocketoption::pocket_client::PocketOption;
use crate::pocketoption::types::Deal;
use async_trait::async_trait;
use futures_util::stream::select_all;
use futures_util::StreamExt;
use rust_decimal::Decimal;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, warn};
#[derive(Clone)]
pub struct Context {
pub market: Arc<dyn Market>,
pub client: Arc<PocketOption>,
}
impl Context {
pub fn new(client: Arc<PocketOption>) -> Self {
Self {
market: client.clone(),
client,
}
}
}
#[async_trait]
pub trait Strategy: Send + Sync {
async fn on_start(&self, _ctx: &Context) -> PocketResult<()> {
Ok(())
}
async fn on_candle(&self, _ctx: &Context, _asset: &str, _candle: &Candle) -> PocketResult<()> {
Ok(())
}
async fn on_tick(&self, _ctx: &Context, _asset: &str, _price: Decimal) -> PocketResult<()> {
Ok(())
}
async fn on_deal_opened(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
Ok(())
}
async fn on_deal_closed(&self, _ctx: &Context, _deal: &Deal) -> PocketResult<()> {
Ok(())
}
async fn on_balance_update(&self, _ctx: &Context, _balance: Decimal) -> PocketResult<()> {
Ok(())
}
}
pub struct Bot {
ctx: Context,
strategy: Arc<Box<dyn Strategy>>,
assets: Vec<(String, SubscriptionType)>,
background_tasks: Vec<tokio::task::JoinHandle<()>>,
update_time: Duration, }
impl Bot {
pub fn new(client: PocketOption, strategy: Box<dyn Strategy>) -> Self {
Self {
ctx: Context::new(Arc::new(client)),
strategy: Arc::new(strategy),
assets: Vec::new(),
background_tasks: Vec::new(),
update_time: Duration::from_secs(5), }
}
pub fn with_update_interval(&mut self, duration: Duration) {
self.update_time = duration;
}
pub fn with_market(mut self, market: Arc<dyn Market>) -> Self {
self.ctx.market = market;
self
}
pub fn add_asset(&mut self, asset: impl Into<String>, sub_type: SubscriptionType) {
self.assets.push((asset.into(), sub_type));
}
pub async fn run(&mut self) -> PocketResult<()> {
info!("Starting bot...");
self.strategy.on_start(&self.ctx).await?;
self.spawn_balance_task();
let mut streams = Vec::new();
for (asset, sub_type) in &self.assets {
info!("Subscribing to {}...", asset);
let stream = self
.ctx
.client
.subscribe(asset.clone(), sub_type.clone())
.await?;
streams.push(stream.to_stream().map({
let asset = asset.clone();
move |res| (asset.clone(), res)
}));
}
if streams.is_empty() {
error!("No assets added to the bot. Exiting.");
return Ok(());
}
let mut combined_stream = select_all(streams);
info!("Bot is now running.");
while let Some((asset, result)) = combined_stream.next().await {
match result {
Ok(candle) => {
if let Err(e) = self.strategy.on_candle(&self.ctx, &asset, &candle).await {
warn!(target: "Framework", "Strategy on_candle error for {}: {:?}", asset, e);
}
}
Err(e) => {
error!("Stream error for {}: {:?}", asset, e);
}
}
}
Ok(())
}
fn spawn_balance_task(&mut self) {
info!("Spawning balance update task with interval of {:?}...", self.update_time);
let ctx = self.ctx.clone();
let strategy = self.strategy.clone();
let time = self.update_time;
let mut last_balance = Decimal::ZERO;
let task = tokio::spawn(async move {
loop {
let balance = ctx.market.balance().await;
if balance != last_balance {
info!("Balance updated: {}", balance);
last_balance = balance;
if let Err(e) = strategy.on_balance_update(&ctx,balance).await {
warn!("Strategy on_balance_update error sharing balance {}: {:?}", balance, e);
}
}
tokio::time::sleep(time).await;
}
});
self.background_tasks.push(task);
}
}