1use futures::stream::Stream;
31use serde::Deserialize;
32use std::pin::Pin;
33
34use crate::Client;
35use crate::Error;
36
37#[derive(Debug, Clone, Deserialize)]
42#[serde(rename_all = "PascalCase")]
43pub struct StreamStatus {
44 pub status: String,
46 #[serde(default)]
48 pub message: Option<String>,
49}
50
51#[derive(Debug, Clone, Deserialize)]
56#[serde(rename_all = "PascalCase")]
57pub struct StreamQuote {
58 pub symbol: Option<String>,
60 pub last: Option<String>,
62 pub ask: Option<String>,
64 pub bid: Option<String>,
66 pub volume: Option<String>,
68 #[serde(default)]
70 pub trade_time: Option<String>,
71 #[serde(default)]
73 pub status: Option<String>,
74}
75
76impl StreamQuote {
77 pub fn is_status(&self) -> bool {
79 self.status.is_some()
80 }
81
82 pub fn is_go_away(&self) -> bool {
84 self.status.as_deref() == Some("GoAway")
85 }
86}
87
88#[derive(Debug, Clone, Deserialize)]
92#[serde(rename_all = "PascalCase")]
93pub struct StreamBar {
94 pub high: Option<String>,
96 pub low: Option<String>,
98 pub open: Option<String>,
100 pub close: Option<String>,
102 pub time_stamp: Option<String>,
104 pub total_volume: Option<String>,
106 #[serde(default)]
108 pub status: Option<String>,
109}
110
111impl StreamBar {
112 pub fn is_status(&self) -> bool {
114 self.status.is_some()
115 }
116}
117
118#[derive(Debug, Clone, Deserialize)]
122#[serde(rename_all = "PascalCase")]
123pub struct StreamMarketDepthQuote {
124 pub symbol: Option<String>,
126 pub ask: Option<String>,
128 pub ask_size: Option<String>,
130 pub bid: Option<String>,
132 pub bid_size: Option<String>,
134 #[serde(default)]
136 pub side: Option<String>,
137 #[serde(default)]
139 pub status: Option<String>,
140}
141
142impl StreamMarketDepthQuote {
143 pub fn is_status(&self) -> bool {
145 self.status.is_some()
146 }
147}
148
149#[derive(Debug, Clone, Deserialize)]
153#[serde(rename_all = "PascalCase")]
154pub struct StreamMarketDepthAggregate {
155 pub symbol: Option<String>,
157 pub total_ask_size: Option<String>,
159 pub total_bid_size: Option<String>,
161 #[serde(default)]
163 pub levels: Option<u32>,
164 #[serde(default)]
166 pub status: Option<String>,
167}
168
169impl StreamMarketDepthAggregate {
170 pub fn is_status(&self) -> bool {
172 self.status.is_some()
173 }
174}
175
176#[derive(Debug, Clone, Deserialize)]
180#[serde(rename_all = "PascalCase")]
181pub struct StreamOptionChain {
182 pub symbol: Option<String>,
184 pub underlying: Option<String>,
186 #[serde(default, rename = "Type")]
188 pub option_type: Option<String>,
189 pub strike_price: Option<String>,
191 pub expiration_date: Option<String>,
193 pub bid: Option<String>,
195 pub ask: Option<String>,
197 pub last: Option<String>,
199 #[serde(default)]
201 pub status: Option<String>,
202}
203
204impl StreamOptionChain {
205 pub fn is_status(&self) -> bool {
207 self.status.is_some()
208 }
209}
210
211#[derive(Debug, Clone, Deserialize)]
215#[serde(rename_all = "PascalCase")]
216pub struct StreamOptionQuote {
217 pub symbol: Option<String>,
219 pub bid: Option<String>,
221 pub ask: Option<String>,
223 pub last: Option<String>,
225 pub volume: Option<String>,
227 #[serde(default)]
229 pub open_interest: Option<String>,
230 #[serde(default)]
232 pub status: Option<String>,
233}
234
235impl StreamOptionQuote {
236 pub fn is_status(&self) -> bool {
238 self.status.is_some()
239 }
240}
241
242#[derive(Debug, Clone, Deserialize)]
246#[serde(rename_all = "PascalCase")]
247pub struct StreamOrder {
248 pub order_id: Option<String>,
250 pub account_id: Option<String>,
252 pub symbol: Option<String>,
254 pub quantity: Option<String>,
256 pub order_type: Option<String>,
258 #[serde(default)]
260 pub order_status: Option<String>,
261 #[serde(default)]
263 pub filled_quantity: Option<String>,
264 #[serde(default)]
266 pub status: Option<String>,
267}
268
269impl StreamOrder {
270 pub fn is_status(&self) -> bool {
272 self.status.is_some()
273 }
274}
275
276#[derive(Debug, Clone, Deserialize)]
280#[serde(rename_all = "PascalCase")]
281pub struct StreamPosition {
282 pub account_id: Option<String>,
284 pub symbol: Option<String>,
286 pub quantity: Option<String>,
288 pub average_price: Option<String>,
290 pub last: Option<String>,
292 #[serde(default)]
294 pub unrealized_profit_loss: Option<String>,
295 #[serde(default)]
297 pub status: Option<String>,
298}
299
300impl StreamPosition {
301 pub fn is_status(&self) -> bool {
303 self.status.is_some()
304 }
305}
306
307pub type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>;
312
313impl Client {
314 pub async fn stream_quotes(
319 &mut self,
320 symbols: &[&str],
321 ) -> Result<BoxStream<StreamQuote>, Error> {
322 let symbols_str = symbols.join(",");
323 let path = format!("/v3/marketdata/stream/quotes/{}", symbols_str);
324 let headers = self.auth_headers().await?;
325 let url = format!("{}{}", self.base_url(), &path);
326
327 let resp = self.http.get(&url).headers(headers).send().await?;
328
329 if !resp.status().is_success() {
330 let status = resp.status().as_u16();
331 let body = resp.text().await.unwrap_or_default();
332 return Err(Error::Api {
333 status,
334 message: body,
335 });
336 }
337
338 let stream = async_stream::try_stream! {
339 let mut bytes_stream = resp.bytes_stream();
340 let mut buffer = String::new();
341
342 use futures::StreamExt;
343 while let Some(chunk) = bytes_stream.next().await {
344 let chunk = chunk.map_err(Error::Http)?;
345 buffer.push_str(&String::from_utf8_lossy(&chunk));
346
347 while let Some(newline_pos) = buffer.find('\n') {
349 let line = buffer[..newline_pos].trim().to_string();
350 buffer = buffer[newline_pos + 1..].to_string();
351
352 if line.is_empty() {
353 continue;
354 }
355
356 match serde_json::from_str::<StreamQuote>(&line) {
357 Ok(quote) => yield quote,
358 Err(e) => {
359 tracing::warn!("Failed to parse stream quote: {e}, line: {line}");
360 }
361 }
362 }
363 }
364 };
365
366 Ok(Box::pin(stream))
367 }
368
369 pub async fn stream_bars(
377 &mut self,
378 symbol: &str,
379 interval: &str,
380 unit: &str,
381 ) -> Result<BoxStream<StreamBar>, Error> {
382 let path = format!(
383 "/v3/marketdata/stream/barcharts/{}?interval={}&unit={}",
384 symbol, interval, unit
385 );
386 let headers = self.auth_headers().await?;
387 let url = format!("{}{}", self.base_url(), &path);
388
389 let resp = self.http.get(&url).headers(headers).send().await?;
390
391 if !resp.status().is_success() {
392 let status = resp.status().as_u16();
393 let body = resp.text().await.unwrap_or_default();
394 return Err(Error::Api {
395 status,
396 message: body,
397 });
398 }
399
400 let stream = async_stream::try_stream! {
401 let mut bytes_stream = resp.bytes_stream();
402 let mut buffer = String::new();
403
404 use futures::StreamExt;
405 while let Some(chunk) = bytes_stream.next().await {
406 let chunk = chunk.map_err(Error::Http)?;
407 buffer.push_str(&String::from_utf8_lossy(&chunk));
408
409 while let Some(newline_pos) = buffer.find('\n') {
410 let line = buffer[..newline_pos].trim().to_string();
411 buffer = buffer[newline_pos + 1..].to_string();
412
413 if line.is_empty() {
414 continue;
415 }
416
417 match serde_json::from_str::<StreamBar>(&line) {
418 Ok(bar) => yield bar,
419 Err(e) => {
420 tracing::warn!("Failed to parse stream bar: {e}, line: {line}");
421 }
422 }
423 }
424 }
425 };
426
427 Ok(Box::pin(stream))
428 }
429
430 pub async fn stream_market_depth_quotes(
432 &mut self,
433 symbol: &str,
434 ) -> Result<BoxStream<StreamMarketDepthQuote>, Error> {
435 let path = format!("/v3/marketdata/stream/marketdepth/quotes/{}", symbol);
436 let headers = self.auth_headers().await?;
437 let url = format!("{}{}", self.base_url(), &path);
438
439 let resp = self.http.get(&url).headers(headers).send().await?;
440
441 if !resp.status().is_success() {
442 let status = resp.status().as_u16();
443 let body = resp.text().await.unwrap_or_default();
444 return Err(Error::Api {
445 status,
446 message: body,
447 });
448 }
449
450 let stream = async_stream::try_stream! {
451 let mut bytes_stream = resp.bytes_stream();
452 let mut buffer = String::new();
453
454 use futures::StreamExt;
455 while let Some(chunk) = bytes_stream.next().await {
456 let chunk = chunk.map_err(Error::Http)?;
457 buffer.push_str(&String::from_utf8_lossy(&chunk));
458
459 while let Some(newline_pos) = buffer.find('\n') {
460 let line = buffer[..newline_pos].trim().to_string();
461 buffer = buffer[newline_pos + 1..].to_string();
462
463 if line.is_empty() {
464 continue;
465 }
466
467 match serde_json::from_str::<StreamMarketDepthQuote>(&line) {
468 Ok(item) => yield item,
469 Err(e) => {
470 tracing::warn!("Failed to parse stream market depth quote: {e}, line: {line}");
471 }
472 }
473 }
474 }
475 };
476
477 Ok(Box::pin(stream))
478 }
479
480 pub async fn stream_market_depth_aggregates(
482 &mut self,
483 symbol: &str,
484 ) -> Result<BoxStream<StreamMarketDepthAggregate>, Error> {
485 let path = format!("/v3/marketdata/stream/marketdepth/aggregates/{}", symbol);
486 let headers = self.auth_headers().await?;
487 let url = format!("{}{}", self.base_url(), &path);
488
489 let resp = self.http.get(&url).headers(headers).send().await?;
490
491 if !resp.status().is_success() {
492 let status = resp.status().as_u16();
493 let body = resp.text().await.unwrap_or_default();
494 return Err(Error::Api {
495 status,
496 message: body,
497 });
498 }
499
500 let stream = async_stream::try_stream! {
501 let mut bytes_stream = resp.bytes_stream();
502 let mut buffer = String::new();
503
504 use futures::StreamExt;
505 while let Some(chunk) = bytes_stream.next().await {
506 let chunk = chunk.map_err(Error::Http)?;
507 buffer.push_str(&String::from_utf8_lossy(&chunk));
508
509 while let Some(newline_pos) = buffer.find('\n') {
510 let line = buffer[..newline_pos].trim().to_string();
511 buffer = buffer[newline_pos + 1..].to_string();
512
513 if line.is_empty() {
514 continue;
515 }
516
517 match serde_json::from_str::<StreamMarketDepthAggregate>(&line) {
518 Ok(item) => yield item,
519 Err(e) => {
520 tracing::warn!("Failed to parse stream market depth aggregate: {e}, line: {line}");
521 }
522 }
523 }
524 }
525 };
526
527 Ok(Box::pin(stream))
528 }
529
530 pub async fn stream_option_chains(
532 &mut self,
533 underlying: &str,
534 ) -> Result<BoxStream<StreamOptionChain>, Error> {
535 let path = format!("/v3/marketdata/stream/options/chains/{}", underlying);
536 let headers = self.auth_headers().await?;
537 let url = format!("{}{}", self.base_url(), &path);
538
539 let resp = self.http.get(&url).headers(headers).send().await?;
540
541 if !resp.status().is_success() {
542 let status = resp.status().as_u16();
543 let body = resp.text().await.unwrap_or_default();
544 return Err(Error::Api {
545 status,
546 message: body,
547 });
548 }
549
550 let stream = async_stream::try_stream! {
551 let mut bytes_stream = resp.bytes_stream();
552 let mut buffer = String::new();
553
554 use futures::StreamExt;
555 while let Some(chunk) = bytes_stream.next().await {
556 let chunk = chunk.map_err(Error::Http)?;
557 buffer.push_str(&String::from_utf8_lossy(&chunk));
558
559 while let Some(newline_pos) = buffer.find('\n') {
560 let line = buffer[..newline_pos].trim().to_string();
561 buffer = buffer[newline_pos + 1..].to_string();
562
563 if line.is_empty() {
564 continue;
565 }
566
567 match serde_json::from_str::<StreamOptionChain>(&line) {
568 Ok(item) => yield item,
569 Err(e) => {
570 tracing::warn!("Failed to parse stream option chain: {e}, line: {line}");
571 }
572 }
573 }
574 }
575 };
576
577 Ok(Box::pin(stream))
578 }
579
580 pub async fn stream_option_quotes(
582 &mut self,
583 legs: &[&str],
584 ) -> Result<BoxStream<StreamOptionQuote>, Error> {
585 let legs_str = legs.join(",");
586 let path = format!("/v3/marketdata/stream/options/quotes/{}", legs_str);
587 let headers = self.auth_headers().await?;
588 let url = format!("{}{}", self.base_url(), &path);
589
590 let resp = self.http.get(&url).headers(headers).send().await?;
591
592 if !resp.status().is_success() {
593 let status = resp.status().as_u16();
594 let body = resp.text().await.unwrap_or_default();
595 return Err(Error::Api {
596 status,
597 message: body,
598 });
599 }
600
601 let stream = async_stream::try_stream! {
602 let mut bytes_stream = resp.bytes_stream();
603 let mut buffer = String::new();
604
605 use futures::StreamExt;
606 while let Some(chunk) = bytes_stream.next().await {
607 let chunk = chunk.map_err(Error::Http)?;
608 buffer.push_str(&String::from_utf8_lossy(&chunk));
609
610 while let Some(newline_pos) = buffer.find('\n') {
611 let line = buffer[..newline_pos].trim().to_string();
612 buffer = buffer[newline_pos + 1..].to_string();
613
614 if line.is_empty() {
615 continue;
616 }
617
618 match serde_json::from_str::<StreamOptionQuote>(&line) {
619 Ok(item) => yield item,
620 Err(e) => {
621 tracing::warn!("Failed to parse stream option quote: {e}, line: {line}");
622 }
623 }
624 }
625 }
626 };
627
628 Ok(Box::pin(stream))
629 }
630
631 pub async fn stream_orders(
633 &mut self,
634 account_ids: &[&str],
635 ) -> Result<BoxStream<StreamOrder>, Error> {
636 let ids = account_ids.join(",");
637 let path = format!("/v3/brokerage/stream/accounts/{}/orders", ids);
638 let headers = self.auth_headers().await?;
639 let url = format!("{}{}", self.base_url(), &path);
640
641 let resp = self.http.get(&url).headers(headers).send().await?;
642
643 if !resp.status().is_success() {
644 let status = resp.status().as_u16();
645 let body = resp.text().await.unwrap_or_default();
646 return Err(Error::Api {
647 status,
648 message: body,
649 });
650 }
651
652 let stream = async_stream::try_stream! {
653 let mut bytes_stream = resp.bytes_stream();
654 let mut buffer = String::new();
655
656 use futures::StreamExt;
657 while let Some(chunk) = bytes_stream.next().await {
658 let chunk = chunk.map_err(Error::Http)?;
659 buffer.push_str(&String::from_utf8_lossy(&chunk));
660
661 while let Some(newline_pos) = buffer.find('\n') {
662 let line = buffer[..newline_pos].trim().to_string();
663 buffer = buffer[newline_pos + 1..].to_string();
664
665 if line.is_empty() {
666 continue;
667 }
668
669 match serde_json::from_str::<StreamOrder>(&line) {
670 Ok(item) => yield item,
671 Err(e) => {
672 tracing::warn!("Failed to parse stream order: {e}, line: {line}");
673 }
674 }
675 }
676 }
677 };
678
679 Ok(Box::pin(stream))
680 }
681
682 pub async fn stream_orders_by_id(
684 &mut self,
685 account_ids: &[&str],
686 order_ids: &[&str],
687 ) -> Result<BoxStream<StreamOrder>, Error> {
688 let ids = account_ids.join(",");
689 let oids = order_ids.join(",");
690 let path = format!("/v3/brokerage/stream/accounts/{}/orders/{}", ids, oids);
691 let headers = self.auth_headers().await?;
692 let url = format!("{}{}", self.base_url(), &path);
693
694 let resp = self.http.get(&url).headers(headers).send().await?;
695
696 if !resp.status().is_success() {
697 let status = resp.status().as_u16();
698 let body = resp.text().await.unwrap_or_default();
699 return Err(Error::Api {
700 status,
701 message: body,
702 });
703 }
704
705 let stream = async_stream::try_stream! {
706 let mut bytes_stream = resp.bytes_stream();
707 let mut buffer = String::new();
708
709 use futures::StreamExt;
710 while let Some(chunk) = bytes_stream.next().await {
711 let chunk = chunk.map_err(Error::Http)?;
712 buffer.push_str(&String::from_utf8_lossy(&chunk));
713
714 while let Some(newline_pos) = buffer.find('\n') {
715 let line = buffer[..newline_pos].trim().to_string();
716 buffer = buffer[newline_pos + 1..].to_string();
717
718 if line.is_empty() {
719 continue;
720 }
721
722 match serde_json::from_str::<StreamOrder>(&line) {
723 Ok(item) => yield item,
724 Err(e) => {
725 tracing::warn!("Failed to parse stream order: {e}, line: {line}");
726 }
727 }
728 }
729 }
730 };
731
732 Ok(Box::pin(stream))
733 }
734
735 pub async fn stream_positions(
737 &mut self,
738 account_ids: &[&str],
739 ) -> Result<BoxStream<StreamPosition>, Error> {
740 let ids = account_ids.join(",");
741 let path = format!("/v3/brokerage/stream/accounts/{}/positions", ids);
742 let headers = self.auth_headers().await?;
743 let url = format!("{}{}", self.base_url(), &path);
744
745 let resp = self.http.get(&url).headers(headers).send().await?;
746
747 if !resp.status().is_success() {
748 let status = resp.status().as_u16();
749 let body = resp.text().await.unwrap_or_default();
750 return Err(Error::Api {
751 status,
752 message: body,
753 });
754 }
755
756 let stream = async_stream::try_stream! {
757 let mut bytes_stream = resp.bytes_stream();
758 let mut buffer = String::new();
759
760 use futures::StreamExt;
761 while let Some(chunk) = bytes_stream.next().await {
762 let chunk = chunk.map_err(Error::Http)?;
763 buffer.push_str(&String::from_utf8_lossy(&chunk));
764
765 while let Some(newline_pos) = buffer.find('\n') {
766 let line = buffer[..newline_pos].trim().to_string();
767 buffer = buffer[newline_pos + 1..].to_string();
768
769 if line.is_empty() {
770 continue;
771 }
772
773 match serde_json::from_str::<StreamPosition>(&line) {
774 Ok(item) => yield item,
775 Err(e) => {
776 tracing::warn!("Failed to parse stream position: {e}, line: {line}");
777 }
778 }
779 }
780 }
781 };
782
783 Ok(Box::pin(stream))
784 }
785}