use std::sync::{Arc, Mutex, MutexGuard};
use crate::order_book::OrderBook;
use crate::api::ApiClient;
pub struct LiveOrderBook {
order_book: Arc<Mutex<OrderBook>>,
}
pub enum BookState<'a> {
Live(MutexGuard<'a, OrderBook>),
Disconnected,
}
impl LiveOrderBook {
pub fn new<C: ApiClient>(stream: C::Stream) -> LiveOrderBook {
use std::thread;
use futures::prelude::*;
use crate::api::Notification;
let order_book = Arc::new(Mutex::new(OrderBook::new()));
let weak = order_book.clone();
let (sender, receiver) = std::sync::mpsc::sync_channel(0);
thread::spawn(move || {
let weak = Arc::downgrade(&weak);
let mut snapshot = false;
let fut = stream.for_each(|notif| {
if let Notification::LimitUpdates(updates) = notif {
if let Some(order_book) = weak.upgrade() {
let mut order_book = order_book.lock().unwrap();
for update in updates {
order_book.update(update.into_inner());
}
if !snapshot {
sender.send(()).unwrap();
snapshot = true;
}
} else {
return Err(());
}
}
Ok(())
});
use tokio::runtime::current_thread;
let _ = current_thread::block_on_all(fut);
});
let _ = receiver.recv();
LiveOrderBook {
order_book,
}
}
pub fn order_book(&self) -> BookState<'_> {
if Arc::weak_count(&self.order_book) == 0 {
BookState::Disconnected
} else {
BookState::Live(self.order_book.lock().unwrap())
}
}
}