use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock};
use crate::error::{Error, Result};
use crate::types::orderbook::{ObMessage, OrderbookLevel, OrderbookUpdate};
use super::state::LocalOrderbook;
use super::stream::{ObStream, ObStreamOptions};
#[derive(Debug, Clone)]
pub struct EngineOptions {
pub compress: bool,
pub auto_reconnect: bool,
pub max_reconnect_attempts: Option<u32>,
}
impl Default for EngineOptions {
fn default() -> Self {
Self {
compress: true,
auto_reconnect: true,
max_reconnect_attempts: None,
}
}
}
struct ViewInner {
tokens: HashSet<String>,
tx: mpsc::Sender<OrderbookUpdate>,
}
pub struct OrderbookEngine {
state: Arc<RwLock<LocalOrderbook>>,
views: Arc<RwLock<Vec<Arc<RwLock<ViewInner>>>>>,
cmd_tx: mpsc::Sender<EngineCommand>,
_task: tokio::task::JoinHandle<()>,
}
enum EngineCommand {
Subscribe(Vec<String>),
Close,
}
impl OrderbookEngine {
pub async fn connect(api_key: &str, options: EngineOptions) -> Result<Self> {
let ob_url = "wss://ob.polynode.dev/ws";
let stream_opts = ObStreamOptions {
compress: options.compress,
auto_reconnect: options.auto_reconnect,
max_reconnect_attempts: options.max_reconnect_attempts,
..Default::default()
};
let mut stream = ObStream::connect(api_key, ob_url, stream_opts).await?;
let state = Arc::new(RwLock::new(LocalOrderbook::new()));
let views: Arc<RwLock<Vec<Arc<RwLock<ViewInner>>>>> = Arc::new(RwLock::new(Vec::new()));
let (cmd_tx, mut cmd_rx) = mpsc::channel::<EngineCommand>(64);
let state_clone = state.clone();
let views_clone = views.clone();
let task = tokio::spawn(async move {
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(Ok(ObMessage::Update(update))) => {
{
let mut s = state_clone.write().await;
match &update {
OrderbookUpdate::Snapshot(snap) => s.apply_snapshot(snap),
OrderbookUpdate::Update(u) => s.apply_update(u),
OrderbookUpdate::PriceChange(_) => {},
OrderbookUpdate::LastTradePrice(_) => {},
}
}
let asset_id = match &update {
OrderbookUpdate::Snapshot(s) => &s.asset_id,
OrderbookUpdate::Update(u) => &u.asset_id,
OrderbookUpdate::PriceChange(_) => continue,
OrderbookUpdate::LastTradePrice(t) => &t.asset_id,
};
let views = views_clone.read().await;
for view_arc in views.iter() {
let view = view_arc.read().await;
if view.tokens.contains(asset_id) {
let _ = view.tx.try_send(update.clone());
}
}
}
Some(Ok(_)) => {} Some(Err(_)) => {} None => break, }
}
cmd = cmd_rx.recv() => {
match cmd {
Some(EngineCommand::Subscribe(ids)) => {
let _ = stream.subscribe(ids).await;
}
Some(EngineCommand::Close) | None => break,
}
}
}
}
});
Ok(Self {
state,
views,
cmd_tx,
_task: task,
})
}
pub async fn subscribe(&self, identifiers: Vec<String>) -> Result<()> {
self.cmd_tx
.send(EngineCommand::Subscribe(identifiers))
.await
.map_err(|_| Error::Disconnected)
}
pub fn view(&self, token_ids: Vec<String>) -> EngineView {
let (tx, rx) = mpsc::channel(1024);
let inner = Arc::new(RwLock::new(ViewInner {
tokens: token_ids.into_iter().collect(),
tx,
}));
let views = self.views.clone();
let inner_clone = inner.clone();
tokio::spawn(async move {
views.write().await.push(inner_clone);
});
EngineView {
state: self.state.clone(),
_inner: inner,
rx,
}
}
pub fn state(&self) -> Arc<RwLock<LocalOrderbook>> {
self.state.clone()
}
pub async fn midpoint(&self, token_id: &str) -> Option<f64> {
self.state.read().await.midpoint(token_id)
}
pub async fn spread(&self, token_id: &str) -> Option<f64> {
self.state.read().await.spread(token_id)
}
pub async fn best_bid(&self, token_id: &str) -> Option<OrderbookLevel> {
self.state.read().await.best_bid(token_id).cloned()
}
pub async fn best_ask(&self, token_id: &str) -> Option<OrderbookLevel> {
self.state.read().await.best_ask(token_id).cloned()
}
pub async fn book(&self, token_id: &str) -> Option<(Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
self.state.read().await
.get_book(token_id)
.map(|(b, a)| (b.to_vec(), a.to_vec()))
}
pub async fn len(&self) -> usize {
self.state.read().await.len()
}
pub async fn tracked_tokens(&self) -> Vec<String> {
self.state.read().await.tracked_tokens()
}
pub async fn last_change(&self, token_id: &str) -> Option<Instant> {
self.state.read().await.last_change(token_id)
}
pub async fn inactive_since(&self, threshold: Duration) -> Vec<String> {
self.state.read().await.inactive_since(threshold)
}
pub async fn midpoints(&self, token_ids: &[String]) -> HashMap<String, f64> {
self.state.read().await.midpoints(token_ids)
}
pub async fn midpoints_all(&self) -> HashMap<String, f64> {
self.state.read().await.midpoints_all()
}
pub async fn spreads(&self, token_ids: &[String]) -> HashMap<String, f64> {
self.state.read().await.spreads(token_ids)
}
pub async fn spreads_all(&self) -> HashMap<String, f64> {
self.state.read().await.spreads_all()
}
pub async fn best_bids(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_bids(token_ids)
}
pub async fn best_bids_all(&self) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_bids_all()
}
pub async fn best_asks(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_asks(token_ids)
}
pub async fn best_asks_all(&self) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_asks_all()
}
pub async fn books(&self, token_ids: &[String]) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
self.state.read().await.books(token_ids)
}
pub async fn books_all(&self) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
self.state.read().await.books_all()
}
pub async fn close(self) -> Result<()> {
let _ = self.cmd_tx.send(EngineCommand::Close).await;
Ok(())
}
}
pub struct EngineView {
state: Arc<RwLock<LocalOrderbook>>,
_inner: Arc<RwLock<ViewInner>>,
rx: mpsc::Receiver<OrderbookUpdate>,
}
impl EngineView {
pub async fn next(&mut self) -> Option<OrderbookUpdate> {
self.rx.recv().await
}
pub async fn set_tokens(&self, token_ids: Vec<String>) {
let mut inner = self._inner.write().await;
inner.tokens = token_ids.into_iter().collect();
}
pub async fn midpoint(&self, token_id: &str) -> Option<f64> {
self.state.read().await.midpoint(token_id)
}
pub async fn spread(&self, token_id: &str) -> Option<f64> {
self.state.read().await.spread(token_id)
}
pub async fn best_bid(&self, token_id: &str) -> Option<OrderbookLevel> {
self.state.read().await.best_bid(token_id).cloned()
}
pub async fn best_ask(&self, token_id: &str) -> Option<OrderbookLevel> {
self.state.read().await.best_ask(token_id).cloned()
}
pub async fn book(&self, token_id: &str) -> Option<(Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
self.state.read().await
.get_book(token_id)
.map(|(b, a)| (b.to_vec(), a.to_vec()))
}
pub async fn tracked_tokens(&self) -> Vec<String> {
self.state.read().await.tracked_tokens()
}
pub async fn last_change(&self, token_id: &str) -> Option<Instant> {
self.state.read().await.last_change(token_id)
}
pub async fn inactive_since(&self, threshold: Duration) -> Vec<String> {
self.state.read().await.inactive_since(threshold)
}
pub async fn midpoints(&self, token_ids: &[String]) -> HashMap<String, f64> {
self.state.read().await.midpoints(token_ids)
}
pub async fn midpoints_all(&self) -> HashMap<String, f64> {
self.state.read().await.midpoints_all()
}
pub async fn spreads(&self, token_ids: &[String]) -> HashMap<String, f64> {
self.state.read().await.spreads(token_ids)
}
pub async fn spreads_all(&self) -> HashMap<String, f64> {
self.state.read().await.spreads_all()
}
pub async fn best_bids(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_bids(token_ids)
}
pub async fn best_bids_all(&self) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_bids_all()
}
pub async fn best_asks(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_asks(token_ids)
}
pub async fn best_asks_all(&self) -> HashMap<String, OrderbookLevel> {
self.state.read().await.best_asks_all()
}
pub async fn books(&self, token_ids: &[String]) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
self.state.read().await.books(token_ids)
}
pub async fn books_all(&self) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
self.state.read().await.books_all()
}
}