alpaca_websocket/
streams.rs

1//! Stream types for WebSocket data.
2
3#![allow(missing_docs)]
4
5use crate::messages::*;
6use alpaca_base::types::*;
7use futures_util::stream::Stream;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::sync::mpsc;
11
12/// Stream of market data updates
13pub struct MarketDataStream {
14    receiver: mpsc::UnboundedReceiver<MarketDataUpdate>,
15}
16
17/// Market data update enum
18#[derive(Debug, Clone)]
19pub enum MarketDataUpdate {
20    Trade { symbol: String, trade: Trade },
21    Quote { symbol: String, quote: Quote },
22    Bar { symbol: String, bar: Bar },
23}
24
25impl MarketDataStream {
26    /// Create a new market data stream
27    pub fn new(receiver: mpsc::UnboundedReceiver<MarketDataUpdate>) -> Self {
28        Self { receiver }
29    }
30}
31
32impl Stream for MarketDataStream {
33    type Item = MarketDataUpdate;
34
35    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36        self.receiver.poll_recv(cx)
37    }
38}
39
40/// Stream of trading updates
41pub struct TradingStream {
42    receiver: mpsc::UnboundedReceiver<TradeUpdateMessage>,
43}
44
45impl TradingStream {
46    /// Create a new trading stream
47    pub fn new(receiver: mpsc::UnboundedReceiver<TradeUpdateMessage>) -> Self {
48        Self { receiver }
49    }
50}
51
52impl Stream for TradingStream {
53    type Item = TradeUpdateMessage;
54
55    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56        self.receiver.poll_recv(cx)
57    }
58}
59
60/// Stream of connection status updates
61pub struct StatusStream {
62    receiver: mpsc::UnboundedReceiver<ConnectionStatus>,
63}
64
65impl StatusStream {
66    /// Create a new status stream
67    pub fn new(receiver: mpsc::UnboundedReceiver<ConnectionStatus>) -> Self {
68        Self { receiver }
69    }
70}
71
72impl Stream for StatusStream {
73    type Item = ConnectionStatus;
74
75    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76        self.receiver.poll_recv(cx)
77    }
78}
79
80/// Combined stream of all WebSocket messages
81pub struct AlpacaStream {
82    receiver: mpsc::UnboundedReceiver<WebSocketMessage>,
83}
84
85impl AlpacaStream {
86    /// Create a new Alpaca stream
87    pub fn new(receiver: mpsc::UnboundedReceiver<WebSocketMessage>) -> Self {
88        Self { receiver }
89    }
90
91    /// Filter for market data updates only
92    pub fn market_data(self) -> impl Stream<Item = MarketDataUpdate> + Unpin {
93        Box::pin(futures_util::stream::StreamExt::filter_map(
94            self,
95            |msg| async move {
96                match msg {
97                    WebSocketMessage::Trade(trade_msg) => Some(MarketDataUpdate::Trade {
98                        symbol: trade_msg.symbol.clone(),
99                        trade: trade_msg.into(),
100                    }),
101                    WebSocketMessage::Quote(quote_msg) => Some(MarketDataUpdate::Quote {
102                        symbol: quote_msg.symbol.clone(),
103                        quote: quote_msg.into(),
104                    }),
105                    WebSocketMessage::Bar(bar_msg) => Some(MarketDataUpdate::Bar {
106                        symbol: bar_msg.symbol.clone(),
107                        bar: bar_msg.into(),
108                    }),
109                    _ => None,
110                }
111            },
112        ))
113    }
114
115    /// Filter for trading updates only
116    pub fn trading_updates(self) -> impl Stream<Item = TradeUpdateMessage> + Unpin {
117        Box::pin(futures_util::stream::StreamExt::filter_map(
118            self,
119            |msg| async move {
120                match msg {
121                    WebSocketMessage::TradeUpdate(update) => Some(*update),
122                    _ => None,
123                }
124            },
125        ))
126    }
127
128    /// Filter for status updates only
129    pub fn status_updates(self) -> impl Stream<Item = ConnectionStatus> + Unpin {
130        Box::pin(futures_util::stream::StreamExt::filter_map(
131            self,
132            |msg| async move {
133                match msg {
134                    WebSocketMessage::Connection(conn) => Some(conn.status),
135                    _ => None,
136                }
137            },
138        ))
139    }
140}
141
142impl Stream for AlpacaStream {
143    type Item = WebSocketMessage;
144
145    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
146        self.receiver.poll_recv(cx)
147    }
148}