#![allow(missing_docs)]
use crate::messages::*;
use alpaca_base::types::*;
use futures_util::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
pub struct MarketDataStream {
receiver: mpsc::UnboundedReceiver<MarketDataUpdate>,
}
#[derive(Debug, Clone)]
pub enum MarketDataUpdate {
Trade { symbol: String, trade: Trade },
Quote { symbol: String, quote: Quote },
Bar { symbol: String, bar: Bar },
}
impl MarketDataStream {
pub fn new(receiver: mpsc::UnboundedReceiver<MarketDataUpdate>) -> Self {
Self { receiver }
}
}
impl Stream for MarketDataStream {
type Item = MarketDataUpdate;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
pub struct TradingStream {
receiver: mpsc::UnboundedReceiver<TradeUpdateMessage>,
}
impl TradingStream {
pub fn new(receiver: mpsc::UnboundedReceiver<TradeUpdateMessage>) -> Self {
Self { receiver }
}
}
impl Stream for TradingStream {
type Item = TradeUpdateMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
pub struct StatusStream {
receiver: mpsc::UnboundedReceiver<ConnectionStatus>,
}
impl StatusStream {
pub fn new(receiver: mpsc::UnboundedReceiver<ConnectionStatus>) -> Self {
Self { receiver }
}
}
impl Stream for StatusStream {
type Item = ConnectionStatus;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
pub struct AlpacaStream {
receiver: mpsc::UnboundedReceiver<WebSocketMessage>,
}
impl AlpacaStream {
pub fn new(receiver: mpsc::UnboundedReceiver<WebSocketMessage>) -> Self {
Self { receiver }
}
pub fn market_data(self) -> impl Stream<Item = MarketDataUpdate> + Unpin {
Box::pin(futures_util::stream::StreamExt::filter_map(
self,
|msg| async move {
match msg {
WebSocketMessage::Trade(trade_msg) => Some(MarketDataUpdate::Trade {
symbol: trade_msg.symbol.clone(),
trade: trade_msg.into(),
}),
WebSocketMessage::Quote(quote_msg) => Some(MarketDataUpdate::Quote {
symbol: quote_msg.symbol.clone(),
quote: quote_msg.into(),
}),
WebSocketMessage::Bar(bar_msg) => Some(MarketDataUpdate::Bar {
symbol: bar_msg.symbol.clone(),
bar: bar_msg.into(),
}),
_ => None,
}
},
))
}
pub fn trading_updates(self) -> impl Stream<Item = TradeUpdateMessage> + Unpin {
Box::pin(futures_util::stream::StreamExt::filter_map(
self,
|msg| async move {
match msg {
WebSocketMessage::TradeUpdate(update) => Some(*update),
_ => None,
}
},
))
}
pub fn status_updates(self) -> impl Stream<Item = ConnectionStatus> + Unpin {
Box::pin(futures_util::stream::StreamExt::filter_map(
self,
|msg| async move {
match msg {
WebSocketMessage::Connection(conn) => Some(conn.status),
_ => None,
}
},
))
}
}
impl Stream for AlpacaStream {
type Item = WebSocketMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}