v_exchanges 0.17.3

Implementations of HTTP/HTTPS/WebSocket API methods for some crypto exchanges, using [crypto-botters](<https://github.com/negi-grass/crypto-botters>) framework
Documentation
use adapters::{
	Client,
	binance::{BinanceOption, BinanceWsHandler, BinanceWsUrl},
	generics::ws::{WsConnection, WsError},
};
use jiff::Timestamp;
use serde_with::{DisplayFromStr, serde_as};
use v_utils::trades::Pair;

use crate::{BookShape, BookUpdate, ExchangeStream, Instrument, Trade};

// trades {{{
#[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
pub struct TradesConnection {
	#[deref]
	#[deref_mut]
	connection: WsConnection<BinanceWsHandler>,
	instrument: Instrument,
}
impl TradesConnection {
	pub fn try_new(client: &Client, pairs: Vec<Pair>, instrument: Instrument) -> Result<Self, WsError> {
		let vec_topic_str = pairs.into_iter().map(|p| format!("{}@trade", p.fmt_binance().to_lowercase())).collect::<Vec<_>>();

		let base_url = match instrument {
			Instrument::Perp => BinanceWsUrl::FuturesUsdM,
			Instrument::Spot | Instrument::Margin => BinanceWsUrl::Spot,
			_ => unimplemented!(),
		};
		let connection = client.ws_connection("", vec![BinanceOption::WsUrl(base_url), BinanceOption::WsTopics(vec_topic_str)])?;

		Ok(Self { connection, instrument })
	}
}
#[async_trait::async_trait]
impl ExchangeStream for TradesConnection {
	type Item = Trade;

	async fn next(&mut self) -> Result<Self::Item, WsError> {
		loop {
			let content_event = self.connection.next().await?;
			let trade = match self.instrument {
				Instrument::Perp => {
					let parsed = serde_json::from_value::<TradeEventPerp>(content_event.data.clone()).expect("Exchange responded with invalid trade event");
					Trade::from(parsed)
				}
				Instrument::Spot | Instrument::Margin => {
					let parsed = serde_json::from_value::<TradeEventSpot>(content_event.data.clone()).expect("Exchange responded with invalid trade event");
					Trade::from(parsed)
				}
				_ => unimplemented!(),
			};
			if trade.price == 0.0 || trade.qty_asset == 0.0 {
				tracing::debug!(
					raw_json = %content_event.data,
					topic = %content_event.topic,
					event_type = %content_event.event_type,
					event_time = %content_event.time,
					"Binance sent a zero-valued trade, skipping.\nWas deserialized to: {trade:?}\nReportedly, means non-orderbook trades. Look at `X` value for more info (could be in: {{ADL, INSURANCE_FUND, NA}})",
				);
				continue;
			}
			return Ok(trade);
		}
	}
}

#[serde_as]
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct TradeEventPerp {
	#[serde(rename = "T")]
	timestamp: i64,
	#[serde(rename = "X")]
	_order_type: String,
	#[serde(rename = "m")]
	_is_maker: bool,
	#[serde_as(as = "DisplayFromStr")]
	#[serde(rename = "q")]
	qty_asset: f64,
	#[serde_as(as = "DisplayFromStr")]
	#[serde(rename = "p")]
	price: f64,
	#[serde(rename = "s")]
	_pair: String,
	#[serde(rename = "t")]
	_trade_id: u64,
}
impl From<TradeEventPerp> for Trade {
	fn from(futs: TradeEventPerp) -> Self {
		Self {
			time: Timestamp::from_millisecond(futs.timestamp).expect("Exchange responded with invalid timestamp"),
			qty_asset: futs.qty_asset,
			price: futs.price,
		}
	}
}

#[serde_as]
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct TradeEventSpot {
	#[serde(rename = "T")]
	timestamp: i64,
	#[serde_as(as = "DisplayFromStr")]
	#[serde(rename = "q")]
	qty_asset: f64,
	#[serde_as(as = "DisplayFromStr")]
	#[serde(rename = "p")]
	price: f64,
	#[serde(rename = "s")]
	_pair: String,
}
impl From<TradeEventSpot> for Trade {
	fn from(futs: TradeEventSpot) -> Self {
		Self {
			time: Timestamp::from_millisecond(futs.timestamp).expect("Exchange responded with invalid timestamp"),
			qty_asset: futs.qty_asset,
			price: futs.price,
		}
	}
}

//,}}}

// book {{{
#[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
pub struct BookConnection {
	#[deref]
	#[deref_mut]
	connection: WsConnection<BinanceWsHandler>,
}
impl BookConnection {
	pub fn try_new(client: &Client, pairs: Vec<Pair>, instrument: Instrument) -> Result<Self, WsError> {
		let vec_topic_str = pairs.into_iter().map(|p| format!("{}@depth@100ms", p.fmt_binance().to_lowercase())).collect::<Vec<_>>();

		let base_url = match instrument {
			Instrument::Perp => BinanceWsUrl::FuturesUsdM,
			Instrument::Spot | Instrument::Margin => BinanceWsUrl::Spot,
			_ => unimplemented!(),
		};
		let connection = client.ws_connection("", vec![BinanceOption::WsUrl(base_url), BinanceOption::WsTopics(vec_topic_str)])?;

		Ok(Self { connection })
	}
}
#[async_trait::async_trait]
impl ExchangeStream for BookConnection {
	type Item = BookUpdate;

	async fn next(&mut self) -> Result<Self::Item, WsError> {
		let content_event = self.connection.next().await?;
		let parsed: DepthEvent = serde_json::from_value(content_event.data.clone()).expect("Exchange responded with invalid depth event");
		let time = parsed
			.transaction_time
			.map(|ts| Timestamp::from_millisecond(ts).expect("Exchange responded with invalid timestamp"))
			.unwrap_or(content_event.time);
		let shape = BookShape {
			time,
			bids: parsed.bids.into_iter().map(|(p, q)| (p.parse().unwrap(), q.parse().unwrap())).collect(),
			asks: parsed.asks.into_iter().map(|(p, q)| (p.parse().unwrap(), q.parse().unwrap())).collect(),
		};
		Ok(BookUpdate::Delta(shape))
	}
}

/// Binance diff depth stream event.
/// Spot: https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#diff-depth-stream
/// Futures: https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Diff-Book-Depth-Streams
#[derive(Clone, Debug, serde::Deserialize)]
struct DepthEvent {
	/// Transaction time. Present on futures, absent on spot.
	#[serde(rename = "T")]
	transaction_time: Option<i64>,
	/// Bids: [[price, qty], ...]
	#[serde(rename = "b")]
	bids: Vec<(String, String)>,
	/// Asks: [[price, qty], ...]
	#[serde(rename = "a")]
	asks: Vec<(String, String)>,
}
//,}}}