use crate::get_auth;
use super::connection::*;
use super::error::StreamError;
use super::types::*;
pub struct StockStream {
feed: Feed,
trades: Vec<String>,
quotes: Vec<String>,
bars: Vec<String>,
daily_bars: Vec<String>,
updated_bars: Vec<String>,
statuses: Vec<String>,
}
impl StockStream {
pub fn new(feed: Feed) -> Self {
Self {
feed,
trades: Vec::new(),
quotes: Vec::new(),
bars: Vec::new(),
daily_bars: Vec::new(),
updated_bars: Vec::new(),
statuses: Vec::new(),
}
}
pub fn subscribe_trades(mut self, symbols: Vec<&str>) -> Self {
self.trades = symbols.into_iter().map(String::from).collect();
self
}
pub fn subscribe_quotes(mut self, symbols: Vec<&str>) -> Self {
self.quotes = symbols.into_iter().map(String::from).collect();
self
}
pub fn subscribe_bars(mut self, symbols: Vec<&str>) -> Self {
self.bars = symbols.into_iter().map(String::from).collect();
self
}
pub fn subscribe_daily_bars(mut self, symbols: Vec<&str>) -> Self {
self.daily_bars = symbols.into_iter().map(String::from).collect();
self
}
pub fn subscribe_updated_bars(mut self, symbols: Vec<&str>) -> Self {
self.updated_bars = symbols.into_iter().map(String::from).collect();
self
}
pub fn subscribe_statuses(mut self, symbols: Vec<&str>) -> Self {
self.statuses = symbols.into_iter().map(String::from).collect();
self
}
pub fn start<F>(self, mut handler: F) -> Result<(), StreamError>
where
F: FnMut(MarketDataMessage),
{
let url = format!(
"wss://stream.data.alpaca.markets/v2/{}",
self.feed.to_string()
);
let (key, secret) = get_auth();
let mut socket = ws_connect(&url)?;
auth_market_data(&mut socket, &key, &secret)?;
let sub_msg = serde_json::json!({
"action": "subscribe",
"trades": self.trades,
"quotes": self.quotes,
"bars": self.bars,
"dailyBars": self.daily_bars,
"updatedBars": self.updated_bars,
"statuses": self.statuses,
});
ws_send(&mut socket, &sub_msg)?;
let _sub_response = ws_read_text(&mut socket)?;
loop {
let text = ws_read_text(&mut socket)?;
let messages = parse_market_data_messages(&text)?;
for msg in messages {
handler(msg);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[ignore] fn test_stock_stream_connects() {
let mut count = 0;
let _ = StockStream::new(Feed::Test)
.subscribe_bars(vec!["FAKEPACA"])
.start(|msg| {
dbg!(&msg);
count += 1;
if count >= 3 {
panic!("Received 3 messages successfully");
}
});
}
}