use chrono::{DateTime, Utc};
use color_eyre::eyre::Result;
use discretionary_engine_macros::ProtocolWrapper;
use futures_util::StreamExt;
use serde_json::Value;
use tokio::{sync::mpsc, task::JoinSet};
use tokio_tungstenite::connect_async;
use v_utils::{macros::CompactFormat, trades::Side};
use crate::{
exchange_apis::{order_types::*, Market, Symbol},
protocols::{ProtocolOrders, ProtocolTrait, ProtocolType},
};
#[derive(Debug, Clone, CompactFormat, derive_new::new, Default, Copy, ProtocolWrapper)]
pub struct ApproachingLimit {
deadline: DateTime<Utc>,
}
impl ProtocolTrait for ApproachingLimitWrapper {
type Params = ApproachingLimit;
fn attach(&self, position_js: &mut JoinSet<Result<()>>, tx_orders: mpsc::Sender<ProtocolOrders>, asset: String, protocol_side: Side) -> Result<()> {
let symbol = Symbol {
base: asset,
quote: "USDT".to_owned(),
market: Market::BinanceFutures,
};
let address = format!("wss://fstream.binance.com/ws/{}@aggTrade", symbol.to_string().to_lowercase());
let params = self.0.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(256);
position_js.spawn(async move {
let mut js = JoinSet::new();
js.spawn(async move {
let (ws_stream, _) = connect_async(address).await.unwrap();
let (_, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
let data = msg.unwrap().into_data();
match serde_json::from_slice::<Value>(&data) {
Ok(json) =>
if let Some(price_str) = json.get("p") {
let price: f64 = price_str.as_str().unwrap().parse().unwrap();
tx.send(price).await.unwrap();
},
Err(e) => {
println!("Failed to parse message as JSON: {}", e);
}
}
}
});
js.spawn(async move {
let mut al_indicator = ApproachingLimitIndicator::new();
while let Some(price) = rx.recv().await {
let maybe_order = al_indicator.step(price, params.read().unwrap().deadline, protocol_side, &symbol);
if let Some(order) = maybe_order {
let protocol_spec = params.read().unwrap().to_string();
let protocol_orders = ProtocolOrders::new(protocol_spec, vec![Some(order)]);
tx_orders.send(protocol_orders).await.unwrap();
}
}
});
js.join_all().await;
Ok(())
});
Ok(())
}
fn update_params(&self, new_params: ApproachingLimit) -> Result<()> {
*self.0.write().unwrap() = new_params;
Ok(())
}
fn get_type(&self) -> ProtocolType {
ProtocolType::Momentum
}
}
#[derive(Clone, Debug, Default, Copy)]
struct ApproachingLimitIndicator {
top: f64,
bottom: f64,
}
impl ApproachingLimitIndicator {
fn new() -> Self {
Self { top: 0.0, bottom: 0.0 }
}
fn step(&mut self, price: f64, deadline: DateTime<Utc>, side: Side, symbol: &Symbol) -> Option<ConceptualOrderPercents> {
unimplemented!();
None
}
}