use crate::get_auth;
use crate::trading::AccountType;
use super::connection::*;
use super::error::StreamError;
use super::types::TradeUpdate;
pub struct TradeUpdateStream {
account_type: AccountType,
}
impl TradeUpdateStream {
pub fn new(account_type: AccountType) -> Self {
Self { account_type }
}
pub fn start<F>(self, mut handler: F) -> Result<(), StreamError>
where
F: FnMut(TradeUpdate),
{
let url = match self.account_type {
AccountType::Paper => "wss://paper-api.alpaca.markets/stream",
AccountType::Live => "wss://api.alpaca.markets/stream",
};
let (key, secret) = get_auth();
let mut socket = ws_connect(url)?;
auth_trade_updates(&mut socket, &key, &secret)?;
let listen_msg = serde_json::json!({
"action": "listen",
"data": {
"streams": ["trade_updates"]
}
});
ws_send(&mut socket, &listen_msg)?;
loop {
let text = ws_read_text(&mut socket)?;
let parsed: serde_json::Value = serde_json::from_str(&text)?;
if parsed.get("stream").and_then(|v| v.as_str()) == Some("trade_updates") {
if let Some(data) = parsed.get("data") {
let update: TradeUpdate = serde_json::from_value(data.clone())?;
handler(update);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[ignore] fn test_trade_update_stream_connects() {
let _ = TradeUpdateStream::new(AccountType::Paper).start(|update| {
dbg!(&update);
});
}
}