use futures::stream::Stream;
use serde::Deserialize;
use std::pin::Pin;
use crate::Client;
use crate::Error;
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamStatus {
pub status: String,
#[serde(default)]
pub message: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamQuote {
pub symbol: Option<String>,
pub last: Option<String>,
pub ask: Option<String>,
pub bid: Option<String>,
pub volume: Option<String>,
#[serde(default)]
pub trade_time: Option<String>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamQuote {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
pub fn is_go_away(&self) -> bool {
self.status.as_deref() == Some("GoAway")
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamBar {
pub high: Option<String>,
pub low: Option<String>,
pub open: Option<String>,
pub close: Option<String>,
pub time_stamp: Option<String>,
pub total_volume: Option<String>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamBar {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamMarketDepthQuote {
pub symbol: Option<String>,
pub ask: Option<String>,
pub ask_size: Option<String>,
pub bid: Option<String>,
pub bid_size: Option<String>,
#[serde(default)]
pub side: Option<String>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamMarketDepthQuote {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamMarketDepthAggregate {
pub symbol: Option<String>,
pub total_ask_size: Option<String>,
pub total_bid_size: Option<String>,
#[serde(default)]
pub levels: Option<u32>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamMarketDepthAggregate {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamOptionChain {
pub symbol: Option<String>,
pub underlying: Option<String>,
#[serde(default, rename = "Type")]
pub option_type: Option<String>,
pub strike_price: Option<String>,
pub expiration_date: Option<String>,
pub bid: Option<String>,
pub ask: Option<String>,
pub last: Option<String>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamOptionChain {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamOptionQuote {
pub symbol: Option<String>,
pub bid: Option<String>,
pub ask: Option<String>,
pub last: Option<String>,
pub volume: Option<String>,
#[serde(default)]
pub open_interest: Option<String>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamOptionQuote {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamOrder {
pub order_id: Option<String>,
pub account_id: Option<String>,
pub symbol: Option<String>,
pub quantity: Option<String>,
pub order_type: Option<String>,
#[serde(default)]
pub order_status: Option<String>,
#[serde(default)]
pub filled_quantity: Option<String>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamOrder {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct StreamPosition {
pub account_id: Option<String>,
pub symbol: Option<String>,
pub quantity: Option<String>,
pub average_price: Option<String>,
pub last: Option<String>,
#[serde(default)]
pub unrealized_profit_loss: Option<String>,
#[serde(default)]
pub status: Option<String>,
}
impl StreamPosition {
pub fn is_status(&self) -> bool {
self.status.is_some()
}
}
pub type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>;
impl Client {
pub async fn stream_quotes(
&mut self,
symbols: &[&str],
) -> Result<BoxStream<StreamQuote>, Error> {
let symbols_str = symbols.join(",");
let path = format!("/v3/marketdata/stream/quotes/{}", symbols_str);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamQuote>(&line) {
Ok(quote) => yield quote,
Err(e) => {
tracing::warn!("Failed to parse stream quote: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_bars(
&mut self,
symbol: &str,
interval: &str,
unit: &str,
) -> Result<BoxStream<StreamBar>, Error> {
let path = format!(
"/v3/marketdata/stream/barcharts/{}?interval={}&unit={}",
symbol, interval, unit
);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamBar>(&line) {
Ok(bar) => yield bar,
Err(e) => {
tracing::warn!("Failed to parse stream bar: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_market_depth_quotes(
&mut self,
symbol: &str,
) -> Result<BoxStream<StreamMarketDepthQuote>, Error> {
let path = format!("/v3/marketdata/stream/marketdepth/quotes/{}", symbol);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamMarketDepthQuote>(&line) {
Ok(item) => yield item,
Err(e) => {
tracing::warn!("Failed to parse stream market depth quote: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_market_depth_aggregates(
&mut self,
symbol: &str,
) -> Result<BoxStream<StreamMarketDepthAggregate>, Error> {
let path = format!("/v3/marketdata/stream/marketdepth/aggregates/{}", symbol);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamMarketDepthAggregate>(&line) {
Ok(item) => yield item,
Err(e) => {
tracing::warn!("Failed to parse stream market depth aggregate: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_option_chains(
&mut self,
underlying: &str,
) -> Result<BoxStream<StreamOptionChain>, Error> {
let path = format!("/v3/marketdata/stream/options/chains/{}", underlying);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamOptionChain>(&line) {
Ok(item) => yield item,
Err(e) => {
tracing::warn!("Failed to parse stream option chain: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_option_quotes(
&mut self,
legs: &[&str],
) -> Result<BoxStream<StreamOptionQuote>, Error> {
let legs_str = legs.join(",");
let path = format!("/v3/marketdata/stream/options/quotes/{}", legs_str);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamOptionQuote>(&line) {
Ok(item) => yield item,
Err(e) => {
tracing::warn!("Failed to parse stream option quote: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_orders(
&mut self,
account_ids: &[&str],
) -> Result<BoxStream<StreamOrder>, Error> {
let ids = account_ids.join(",");
let path = format!("/v3/brokerage/stream/accounts/{}/orders", ids);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamOrder>(&line) {
Ok(item) => yield item,
Err(e) => {
tracing::warn!("Failed to parse stream order: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_orders_by_id(
&mut self,
account_ids: &[&str],
order_ids: &[&str],
) -> Result<BoxStream<StreamOrder>, Error> {
let ids = account_ids.join(",");
let oids = order_ids.join(",");
let path = format!("/v3/brokerage/stream/accounts/{}/orders/{}", ids, oids);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamOrder>(&line) {
Ok(item) => yield item,
Err(e) => {
tracing::warn!("Failed to parse stream order: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
pub async fn stream_positions(
&mut self,
account_ids: &[&str],
) -> Result<BoxStream<StreamPosition>, Error> {
let ids = account_ids.join(",");
let path = format!("/v3/brokerage/stream/accounts/{}/positions", ids);
let headers = self.auth_headers().await?;
let url = format!("{}{}", self.base_url(), &path);
let resp = self.http.get(&url).headers(headers).send().await?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::Api {
status,
message: body,
});
}
let stream = async_stream::try_stream! {
let mut bytes_stream = resp.bytes_stream();
let mut buffer = String::new();
use futures::StreamExt;
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk.map_err(Error::Http)?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<StreamPosition>(&line) {
Ok(item) => yield item,
Err(e) => {
tracing::warn!("Failed to parse stream position: {e}, line: {line}");
}
}
}
}
};
Ok(Box::pin(stream))
}
}