alpaca_websocket/
streams.rs1#![allow(missing_docs)]
4
5use crate::messages::*;
6use alpaca_base::types::*;
7use futures_util::stream::Stream;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::sync::mpsc;
11
12pub struct MarketDataStream {
14 receiver: mpsc::UnboundedReceiver<MarketDataUpdate>,
15}
16
17#[derive(Debug, Clone)]
19pub enum MarketDataUpdate {
20 Trade { symbol: String, trade: Trade },
21 Quote { symbol: String, quote: Quote },
22 Bar { symbol: String, bar: Bar },
23}
24
25impl MarketDataStream {
26 pub fn new(receiver: mpsc::UnboundedReceiver<MarketDataUpdate>) -> Self {
28 Self { receiver }
29 }
30}
31
32impl Stream for MarketDataStream {
33 type Item = MarketDataUpdate;
34
35 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36 self.receiver.poll_recv(cx)
37 }
38}
39
40pub struct TradingStream {
42 receiver: mpsc::UnboundedReceiver<TradeUpdateMessage>,
43}
44
45impl TradingStream {
46 pub fn new(receiver: mpsc::UnboundedReceiver<TradeUpdateMessage>) -> Self {
48 Self { receiver }
49 }
50}
51
52impl Stream for TradingStream {
53 type Item = TradeUpdateMessage;
54
55 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56 self.receiver.poll_recv(cx)
57 }
58}
59
60pub struct StatusStream {
62 receiver: mpsc::UnboundedReceiver<ConnectionStatus>,
63}
64
65impl StatusStream {
66 pub fn new(receiver: mpsc::UnboundedReceiver<ConnectionStatus>) -> Self {
68 Self { receiver }
69 }
70}
71
72impl Stream for StatusStream {
73 type Item = ConnectionStatus;
74
75 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76 self.receiver.poll_recv(cx)
77 }
78}
79
80pub struct AlpacaStream {
82 receiver: mpsc::UnboundedReceiver<WebSocketMessage>,
83}
84
85impl AlpacaStream {
86 pub fn new(receiver: mpsc::UnboundedReceiver<WebSocketMessage>) -> Self {
88 Self { receiver }
89 }
90
91 pub fn market_data(self) -> impl Stream<Item = MarketDataUpdate> + Unpin {
93 Box::pin(futures_util::stream::StreamExt::filter_map(
94 self,
95 |msg| async move {
96 match msg {
97 WebSocketMessage::Trade(trade_msg) => Some(MarketDataUpdate::Trade {
98 symbol: trade_msg.symbol.clone(),
99 trade: trade_msg.into(),
100 }),
101 WebSocketMessage::Quote(quote_msg) => Some(MarketDataUpdate::Quote {
102 symbol: quote_msg.symbol.clone(),
103 quote: quote_msg.into(),
104 }),
105 WebSocketMessage::Bar(bar_msg) => Some(MarketDataUpdate::Bar {
106 symbol: bar_msg.symbol.clone(),
107 bar: bar_msg.into(),
108 }),
109 _ => None,
110 }
111 },
112 ))
113 }
114
115 pub fn trading_updates(self) -> impl Stream<Item = TradeUpdateMessage> + Unpin {
117 Box::pin(futures_util::stream::StreamExt::filter_map(
118 self,
119 |msg| async move {
120 match msg {
121 WebSocketMessage::TradeUpdate(update) => Some(*update),
122 _ => None,
123 }
124 },
125 ))
126 }
127
128 pub fn status_updates(self) -> impl Stream<Item = ConnectionStatus> + Unpin {
130 Box::pin(futures_util::stream::StreamExt::filter_map(
131 self,
132 |msg| async move {
133 match msg {
134 WebSocketMessage::Connection(conn) => Some(conn.status),
135 _ => None,
136 }
137 },
138 ))
139 }
140}
141
142impl Stream for AlpacaStream {
143 type Item = WebSocketMessage;
144
145 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
146 self.receiver.poll_recv(cx)
147 }
148}