use self::{consolidated_level_book::ConsolidatedLevelBook, level_book::LevelBook};
use crate::{
config::{Common, Cpty},
protocol::book::{MessageHeader, Snapshot, Updates},
symbology::TradableProduct,
};
use anyhow::{anyhow, bail, Result};
use futures::channel::mpsc;
use fxhash::FxHashMap;
use log::warn;
use netidx::{
pack::Pack,
pool::Pooled,
subscriber::{Dval, Event, SubId, UpdatesFlags, Value},
};
use std::ops::Deref;
pub mod consolidated_level_book;
pub mod level_book;
pub struct BookClient {
book: LevelBook,
subscription: Dval,
synced: bool,
}
impl Deref for BookClient {
type Target = LevelBook;
fn deref(&self) -> &Self::Target {
&self.book
}
}
impl BookClient {
pub async fn detect_legacy(common: &Common, tp: TradableProduct) -> bool {
let (path, _) = tp.subscriber_paths();
let base = common.paths.qf_rt(Some(Cpty { venue: tp.venue, route: tp.route }));
let path = base.append(&path).append("buy/0/price"); let resolver = common.subscriber.resolver();
match resolver.resolve([path.clone()]).await {
Ok((pb, _)) => pb.len() > 0,
Err(e) => {
warn!("failed to resolve {}, {:?}", path, e);
false
}
}
}
pub fn new(
common: &Common,
tp: TradableProduct,
up: mpsc::Sender<Pooled<Vec<(SubId, Event)>>>,
) -> Self {
let (path, _) = tp.subscriber_paths();
let base = common.paths.qf_rt(Some(Cpty { venue: tp.venue, route: tp.route }));
let path = base.append(&path).append("book");
let already = common.subscriber.is_subscribed_or_pending(&path);
let subscription =
common.subscriber.subscribe_updates(path, [(UpdatesFlags::empty(), up)]);
if already || subscription.strong_count() > 1 {
subscription.write(Value::Null);
}
Self { book: LevelBook::default(), subscription, synced: false }
}
pub fn id(&self) -> SubId {
self.subscription.id()
}
pub fn synced(&self) -> bool {
self.synced
}
pub fn process_event(&mut self, ev: Event) -> Result<()> {
match ev {
Event::Update(Value::Bytes(mut buf)) => {
let typ: MessageHeader = Pack::decode(&mut buf)?;
match typ {
MessageHeader::Updates => {
if self.synced {
let updates: Updates = Pack::decode(&mut buf)?;
self.book.update(updates)
}
}
MessageHeader::Snapshot => {
let snap: Snapshot = Pack::decode(&mut buf)?;
self.synced = true;
self.book.update_from_snapshot(snap)
}
}
}
Event::Update(Value::Null) | Event::Unsubscribed => (),
e => bail!("book protocol error, invalid event {:?}", e),
}
Ok(())
}
}
pub struct ConsolidatedBookClient {
consolidated_book: ConsolidatedLevelBook,
books: FxHashMap<SubId, (TradableProduct, BookClient)>,
}
impl Deref for ConsolidatedBookClient {
type Target = ConsolidatedLevelBook;
fn deref(&self) -> &Self::Target {
&self.consolidated_book
}
}
impl ConsolidatedBookClient {
pub fn new(
common: &Common,
tps: Vec<TradableProduct>,
up: mpsc::Sender<Pooled<Vec<(SubId, Event)>>>,
) -> Self {
let mut books: FxHashMap<SubId, (TradableProduct, BookClient)> =
FxHashMap::default();
tps.iter().for_each(|tp| {
let client = BookClient::new(common, *tp, up.clone());
books.insert(client.id(), (*tp, client));
});
Self { consolidated_book: ConsolidatedLevelBook::default(), books }
}
pub fn process_event(&mut self, sub_id: SubId, ev: Event) -> Result<()> {
let (tp, book_client) = self
.books
.get_mut(&sub_id)
.ok_or_else(|| anyhow!("missing book for sub_id: {:?}", sub_id))?;
book_client.process_event(ev.clone())?;
match ev {
Event::Update(Value::Bytes(mut buf)) => {
let typ: MessageHeader = Pack::decode(&mut buf)?;
match typ {
MessageHeader::Updates => {
if book_client.synced() {
let updates: Updates = Pack::decode(&mut buf)?;
self.consolidated_book.update(*tp, updates)
}
}
MessageHeader::Snapshot => {
let snap: Snapshot = Pack::decode(&mut buf)?;
self.consolidated_book.update_from_snapshot(*tp, snap)
}
}
}
Event::Update(Value::Null) | Event::Unsubscribed => (),
e => bail!("book protocol error, invalid event {:?}", e),
}
Ok(())
}
}