Skip to main content

tradestation_api/
streaming.rs

1//! HTTP chunked-transfer streaming for TradeStation v3.
2//!
3//! TradeStation uses HTTP streaming (NOT WebSocket):
4//! - Content-Type: `application/vnd.tradestation.streams.v3+json`
5//! - Newline-delimited JSON objects
6//! - `StreamStatus` messages signal snapshot boundaries (`EndSnapshot`) and
7//!   reconnect requests (`GoAway`)
8//!
9//! All stream methods return a [`BoxStream`] of typed updates that can be
10//! consumed with `futures::StreamExt`.
11//!
12//! # Example
13//!
14//! ```no_run
15//! # use tradestation_api::{Client, Credentials};
16//! # async fn example(client: &mut Client) -> Result<(), Box<dyn std::error::Error>> {
17//! use futures::StreamExt;
18//!
19//! let mut stream = client.stream_quotes(&["AAPL"]).await?;
20//! while let Some(result) = stream.next().await {
21//!     let quote = result?;
22//!     if !quote.is_status() {
23//!         println!("{}: {}", quote.symbol.as_deref().unwrap_or("?"), quote.last.as_deref().unwrap_or("?"));
24//!     }
25//! }
26//! # Ok(())
27//! # }
28//! ```
29
30use futures::stream::Stream;
31use serde::Deserialize;
32use std::pin::Pin;
33
34use crate::Client;
35use crate::Error;
36
37/// Stream status messages from TradeStation.
38///
39/// These appear inline in the stream data to signal events like end-of-snapshot
40/// or a server-initiated disconnect.
41#[derive(Debug, Clone, Deserialize)]
42#[serde(rename_all = "PascalCase")]
43pub struct StreamStatus {
44    /// Status code: "EndSnapshot", "GoAway", etc.
45    pub status: String,
46    /// Optional message with additional details.
47    #[serde(default)]
48    pub message: Option<String>,
49}
50
51/// A streaming quote update.
52///
53/// Contains real-time quote data or a stream status message. Use [`StreamQuote::is_status`]
54/// to distinguish between the two.
55#[derive(Debug, Clone, Deserialize)]
56#[serde(rename_all = "PascalCase")]
57pub struct StreamQuote {
58    /// Ticker symbol.
59    pub symbol: Option<String>,
60    /// Last traded price.
61    pub last: Option<String>,
62    /// Best ask price.
63    pub ask: Option<String>,
64    /// Best bid price.
65    pub bid: Option<String>,
66    /// Cumulative volume.
67    pub volume: Option<String>,
68    /// Time of the last trade.
69    #[serde(default)]
70    pub trade_time: Option<String>,
71    /// Stream status (present only for status messages).
72    #[serde(default)]
73    pub status: Option<String>,
74}
75
76impl StreamQuote {
77    /// Whether this is a status message rather than a quote update.
78    pub fn is_status(&self) -> bool {
79        self.status.is_some()
80    }
81
82    /// Whether this is a GoAway message indicating reconnection is needed.
83    pub fn is_go_away(&self) -> bool {
84        self.status.as_deref() == Some("GoAway")
85    }
86}
87
88/// A streaming bar (OHLCV) update.
89///
90/// Delivered via [`Client::stream_bars`]. Contains partial or completed bar data.
91#[derive(Debug, Clone, Deserialize)]
92#[serde(rename_all = "PascalCase")]
93pub struct StreamBar {
94    /// Highest price during the bar period.
95    pub high: Option<String>,
96    /// Lowest price during the bar period.
97    pub low: Option<String>,
98    /// Opening price.
99    pub open: Option<String>,
100    /// Closing price (updates in real time for the current bar).
101    pub close: Option<String>,
102    /// Bar timestamp.
103    pub time_stamp: Option<String>,
104    /// Total volume during the bar period.
105    pub total_volume: Option<String>,
106    /// Stream status (present only for status messages).
107    #[serde(default)]
108    pub status: Option<String>,
109}
110
111impl StreamBar {
112    /// Whether this is a status message rather than bar data.
113    pub fn is_status(&self) -> bool {
114        self.status.is_some()
115    }
116}
117
118/// A streaming market depth (Level 2) quote.
119///
120/// Delivered via [`Client::stream_market_depth_quotes`].
121#[derive(Debug, Clone, Deserialize)]
122#[serde(rename_all = "PascalCase")]
123pub struct StreamMarketDepthQuote {
124    /// Ticker symbol.
125    pub symbol: Option<String>,
126    /// Ask price at this depth level.
127    pub ask: Option<String>,
128    /// Ask size at this depth level.
129    pub ask_size: Option<String>,
130    /// Bid price at this depth level.
131    pub bid: Option<String>,
132    /// Bid size at this depth level.
133    pub bid_size: Option<String>,
134    /// Side of the book ("Ask" or "Bid").
135    #[serde(default)]
136    pub side: Option<String>,
137    /// Stream status (present only for status messages).
138    #[serde(default)]
139    pub status: Option<String>,
140}
141
142impl StreamMarketDepthQuote {
143    /// Whether this is a status message rather than depth data.
144    pub fn is_status(&self) -> bool {
145        self.status.is_some()
146    }
147}
148
149/// A streaming market depth aggregate summary.
150///
151/// Delivered via [`Client::stream_market_depth_aggregates`].
152#[derive(Debug, Clone, Deserialize)]
153#[serde(rename_all = "PascalCase")]
154pub struct StreamMarketDepthAggregate {
155    /// Ticker symbol.
156    pub symbol: Option<String>,
157    /// Total ask size across all levels.
158    pub total_ask_size: Option<String>,
159    /// Total bid size across all levels.
160    pub total_bid_size: Option<String>,
161    /// Number of price levels.
162    #[serde(default)]
163    pub levels: Option<u32>,
164    /// Stream status (present only for status messages).
165    #[serde(default)]
166    pub status: Option<String>,
167}
168
169impl StreamMarketDepthAggregate {
170    /// Whether this is a status message rather than aggregate data.
171    pub fn is_status(&self) -> bool {
172        self.status.is_some()
173    }
174}
175
176/// A streaming option chain update.
177///
178/// Delivered via [`Client::stream_option_chains`].
179#[derive(Debug, Clone, Deserialize)]
180#[serde(rename_all = "PascalCase")]
181pub struct StreamOptionChain {
182    /// Option symbol.
183    pub symbol: Option<String>,
184    /// Underlying ticker symbol.
185    pub underlying: Option<String>,
186    /// Option type ("Call" or "Put").
187    #[serde(default, rename = "Type")]
188    pub option_type: Option<String>,
189    /// Strike price.
190    pub strike_price: Option<String>,
191    /// Expiration date.
192    pub expiration_date: Option<String>,
193    /// Best bid price.
194    pub bid: Option<String>,
195    /// Best ask price.
196    pub ask: Option<String>,
197    /// Last traded price.
198    pub last: Option<String>,
199    /// Stream status (present only for status messages).
200    #[serde(default)]
201    pub status: Option<String>,
202}
203
204impl StreamOptionChain {
205    /// Whether this is a status message rather than chain data.
206    pub fn is_status(&self) -> bool {
207        self.status.is_some()
208    }
209}
210
211/// A streaming option quote update.
212///
213/// Delivered via [`Client::stream_option_quotes`].
214#[derive(Debug, Clone, Deserialize)]
215#[serde(rename_all = "PascalCase")]
216pub struct StreamOptionQuote {
217    /// Option symbol.
218    pub symbol: Option<String>,
219    /// Best bid price.
220    pub bid: Option<String>,
221    /// Best ask price.
222    pub ask: Option<String>,
223    /// Last traded price.
224    pub last: Option<String>,
225    /// Cumulative volume.
226    pub volume: Option<String>,
227    /// Open interest.
228    #[serde(default)]
229    pub open_interest: Option<String>,
230    /// Stream status (present only for status messages).
231    #[serde(default)]
232    pub status: Option<String>,
233}
234
235impl StreamOptionQuote {
236    /// Whether this is a status message rather than quote data.
237    pub fn is_status(&self) -> bool {
238        self.status.is_some()
239    }
240}
241
242/// A streaming order status update.
243///
244/// Delivered via [`Client::stream_orders`] and [`Client::stream_orders_by_id`].
245#[derive(Debug, Clone, Deserialize)]
246#[serde(rename_all = "PascalCase")]
247pub struct StreamOrder {
248    /// Order identifier.
249    pub order_id: Option<String>,
250    /// Account this order belongs to.
251    pub account_id: Option<String>,
252    /// Ticker symbol.
253    pub symbol: Option<String>,
254    /// Ordered quantity.
255    pub quantity: Option<String>,
256    /// Order type.
257    pub order_type: Option<String>,
258    /// Current order status.
259    #[serde(default)]
260    pub order_status: Option<String>,
261    /// Filled quantity.
262    #[serde(default)]
263    pub filled_quantity: Option<String>,
264    /// Stream status (present only for status messages).
265    #[serde(default)]
266    pub status: Option<String>,
267}
268
269impl StreamOrder {
270    /// Whether this is a status message rather than an order update.
271    pub fn is_status(&self) -> bool {
272        self.status.is_some()
273    }
274}
275
276/// A streaming position update.
277///
278/// Delivered via [`Client::stream_positions`].
279#[derive(Debug, Clone, Deserialize)]
280#[serde(rename_all = "PascalCase")]
281pub struct StreamPosition {
282    /// Account holding this position.
283    pub account_id: Option<String>,
284    /// Ticker symbol.
285    pub symbol: Option<String>,
286    /// Current position quantity.
287    pub quantity: Option<String>,
288    /// Average entry price.
289    pub average_price: Option<String>,
290    /// Last traded price.
291    pub last: Option<String>,
292    /// Unrealized P&L.
293    #[serde(default)]
294    pub unrealized_profit_loss: Option<String>,
295    /// Stream status (present only for status messages).
296    #[serde(default)]
297    pub status: Option<String>,
298}
299
300impl StreamPosition {
301    /// Whether this is a status message rather than a position update.
302    pub fn is_status(&self) -> bool {
303        self.status.is_some()
304    }
305}
306
307/// Type alias for a boxed async stream of results.
308///
309/// All streaming methods return this type. Consume it with
310/// `futures::StreamExt::next()`.
311pub type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>;
312
313impl Client {
314    /// Stream live quote updates for one or more symbols.
315    ///
316    /// Returns an async stream of [`StreamQuote`] values including
317    /// `StreamStatus` messages (EndSnapshot, GoAway).
318    pub async fn stream_quotes(
319        &mut self,
320        symbols: &[&str],
321    ) -> Result<BoxStream<StreamQuote>, Error> {
322        let symbols_str = symbols.join(",");
323        let path = format!("/v3/marketdata/stream/quotes/{}", symbols_str);
324        let headers = self.auth_headers().await?;
325        let url = format!("{}{}", self.base_url(), &path);
326
327        let resp = self.http.get(&url).headers(headers).send().await?;
328
329        if !resp.status().is_success() {
330            let status = resp.status().as_u16();
331            let body = resp.text().await.unwrap_or_default();
332            return Err(Error::Api {
333                status,
334                message: body,
335            });
336        }
337
338        let stream = async_stream::try_stream! {
339            let mut bytes_stream = resp.bytes_stream();
340            let mut buffer = String::new();
341
342            use futures::StreamExt;
343            while let Some(chunk) = bytes_stream.next().await {
344                let chunk = chunk.map_err(Error::Http)?;
345                buffer.push_str(&String::from_utf8_lossy(&chunk));
346
347                // Process complete JSON lines
348                while let Some(newline_pos) = buffer.find('\n') {
349                    let line = buffer[..newline_pos].trim().to_string();
350                    buffer = buffer[newline_pos + 1..].to_string();
351
352                    if line.is_empty() {
353                        continue;
354                    }
355
356                    match serde_json::from_str::<StreamQuote>(&line) {
357                        Ok(quote) => yield quote,
358                        Err(e) => {
359                            tracing::warn!("Failed to parse stream quote: {e}, line: {line}");
360                        }
361                    }
362                }
363            }
364        };
365
366        Ok(Box::pin(stream))
367    }
368
369    /// Stream live bar updates for a symbol.
370    ///
371    /// # Parameters
372    ///
373    /// - `symbol`: Ticker symbol (e.g., "AAPL")
374    /// - `interval`: Bar interval (e.g., "1", "5")
375    /// - `unit`: Bar unit (e.g., "Minute", "Daily")
376    pub async fn stream_bars(
377        &mut self,
378        symbol: &str,
379        interval: &str,
380        unit: &str,
381    ) -> Result<BoxStream<StreamBar>, Error> {
382        let path = format!(
383            "/v3/marketdata/stream/barcharts/{}?interval={}&unit={}",
384            symbol, interval, unit
385        );
386        let headers = self.auth_headers().await?;
387        let url = format!("{}{}", self.base_url(), &path);
388
389        let resp = self.http.get(&url).headers(headers).send().await?;
390
391        if !resp.status().is_success() {
392            let status = resp.status().as_u16();
393            let body = resp.text().await.unwrap_or_default();
394            return Err(Error::Api {
395                status,
396                message: body,
397            });
398        }
399
400        let stream = async_stream::try_stream! {
401            let mut bytes_stream = resp.bytes_stream();
402            let mut buffer = String::new();
403
404            use futures::StreamExt;
405            while let Some(chunk) = bytes_stream.next().await {
406                let chunk = chunk.map_err(Error::Http)?;
407                buffer.push_str(&String::from_utf8_lossy(&chunk));
408
409                while let Some(newline_pos) = buffer.find('\n') {
410                    let line = buffer[..newline_pos].trim().to_string();
411                    buffer = buffer[newline_pos + 1..].to_string();
412
413                    if line.is_empty() {
414                        continue;
415                    }
416
417                    match serde_json::from_str::<StreamBar>(&line) {
418                        Ok(bar) => yield bar,
419                        Err(e) => {
420                            tracing::warn!("Failed to parse stream bar: {e}, line: {line}");
421                        }
422                    }
423                }
424            }
425        };
426
427        Ok(Box::pin(stream))
428    }
429
430    /// Stream Level 2 market depth quotes for a symbol.
431    pub async fn stream_market_depth_quotes(
432        &mut self,
433        symbol: &str,
434    ) -> Result<BoxStream<StreamMarketDepthQuote>, Error> {
435        let path = format!("/v3/marketdata/stream/marketdepth/quotes/{}", symbol);
436        let headers = self.auth_headers().await?;
437        let url = format!("{}{}", self.base_url(), &path);
438
439        let resp = self.http.get(&url).headers(headers).send().await?;
440
441        if !resp.status().is_success() {
442            let status = resp.status().as_u16();
443            let body = resp.text().await.unwrap_or_default();
444            return Err(Error::Api {
445                status,
446                message: body,
447            });
448        }
449
450        let stream = async_stream::try_stream! {
451            let mut bytes_stream = resp.bytes_stream();
452            let mut buffer = String::new();
453
454            use futures::StreamExt;
455            while let Some(chunk) = bytes_stream.next().await {
456                let chunk = chunk.map_err(Error::Http)?;
457                buffer.push_str(&String::from_utf8_lossy(&chunk));
458
459                while let Some(newline_pos) = buffer.find('\n') {
460                    let line = buffer[..newline_pos].trim().to_string();
461                    buffer = buffer[newline_pos + 1..].to_string();
462
463                    if line.is_empty() {
464                        continue;
465                    }
466
467                    match serde_json::from_str::<StreamMarketDepthQuote>(&line) {
468                        Ok(item) => yield item,
469                        Err(e) => {
470                            tracing::warn!("Failed to parse stream market depth quote: {e}, line: {line}");
471                        }
472                    }
473                }
474            }
475        };
476
477        Ok(Box::pin(stream))
478    }
479
480    /// Stream market depth aggregates for a symbol.
481    pub async fn stream_market_depth_aggregates(
482        &mut self,
483        symbol: &str,
484    ) -> Result<BoxStream<StreamMarketDepthAggregate>, Error> {
485        let path = format!("/v3/marketdata/stream/marketdepth/aggregates/{}", symbol);
486        let headers = self.auth_headers().await?;
487        let url = format!("{}{}", self.base_url(), &path);
488
489        let resp = self.http.get(&url).headers(headers).send().await?;
490
491        if !resp.status().is_success() {
492            let status = resp.status().as_u16();
493            let body = resp.text().await.unwrap_or_default();
494            return Err(Error::Api {
495                status,
496                message: body,
497            });
498        }
499
500        let stream = async_stream::try_stream! {
501            let mut bytes_stream = resp.bytes_stream();
502            let mut buffer = String::new();
503
504            use futures::StreamExt;
505            while let Some(chunk) = bytes_stream.next().await {
506                let chunk = chunk.map_err(Error::Http)?;
507                buffer.push_str(&String::from_utf8_lossy(&chunk));
508
509                while let Some(newline_pos) = buffer.find('\n') {
510                    let line = buffer[..newline_pos].trim().to_string();
511                    buffer = buffer[newline_pos + 1..].to_string();
512
513                    if line.is_empty() {
514                        continue;
515                    }
516
517                    match serde_json::from_str::<StreamMarketDepthAggregate>(&line) {
518                        Ok(item) => yield item,
519                        Err(e) => {
520                            tracing::warn!("Failed to parse stream market depth aggregate: {e}, line: {line}");
521                        }
522                    }
523                }
524            }
525        };
526
527        Ok(Box::pin(stream))
528    }
529
530    /// Stream option chain updates for an underlying symbol.
531    pub async fn stream_option_chains(
532        &mut self,
533        underlying: &str,
534    ) -> Result<BoxStream<StreamOptionChain>, Error> {
535        let path = format!("/v3/marketdata/stream/options/chains/{}", underlying);
536        let headers = self.auth_headers().await?;
537        let url = format!("{}{}", self.base_url(), &path);
538
539        let resp = self.http.get(&url).headers(headers).send().await?;
540
541        if !resp.status().is_success() {
542            let status = resp.status().as_u16();
543            let body = resp.text().await.unwrap_or_default();
544            return Err(Error::Api {
545                status,
546                message: body,
547            });
548        }
549
550        let stream = async_stream::try_stream! {
551            let mut bytes_stream = resp.bytes_stream();
552            let mut buffer = String::new();
553
554            use futures::StreamExt;
555            while let Some(chunk) = bytes_stream.next().await {
556                let chunk = chunk.map_err(Error::Http)?;
557                buffer.push_str(&String::from_utf8_lossy(&chunk));
558
559                while let Some(newline_pos) = buffer.find('\n') {
560                    let line = buffer[..newline_pos].trim().to_string();
561                    buffer = buffer[newline_pos + 1..].to_string();
562
563                    if line.is_empty() {
564                        continue;
565                    }
566
567                    match serde_json::from_str::<StreamOptionChain>(&line) {
568                        Ok(item) => yield item,
569                        Err(e) => {
570                            tracing::warn!("Failed to parse stream option chain: {e}, line: {line}");
571                        }
572                    }
573                }
574            }
575        };
576
577        Ok(Box::pin(stream))
578    }
579
580    /// Stream option quote updates for specific option legs.
581    pub async fn stream_option_quotes(
582        &mut self,
583        legs: &[&str],
584    ) -> Result<BoxStream<StreamOptionQuote>, Error> {
585        let legs_str = legs.join(",");
586        let path = format!("/v3/marketdata/stream/options/quotes/{}", legs_str);
587        let headers = self.auth_headers().await?;
588        let url = format!("{}{}", self.base_url(), &path);
589
590        let resp = self.http.get(&url).headers(headers).send().await?;
591
592        if !resp.status().is_success() {
593            let status = resp.status().as_u16();
594            let body = resp.text().await.unwrap_or_default();
595            return Err(Error::Api {
596                status,
597                message: body,
598            });
599        }
600
601        let stream = async_stream::try_stream! {
602            let mut bytes_stream = resp.bytes_stream();
603            let mut buffer = String::new();
604
605            use futures::StreamExt;
606            while let Some(chunk) = bytes_stream.next().await {
607                let chunk = chunk.map_err(Error::Http)?;
608                buffer.push_str(&String::from_utf8_lossy(&chunk));
609
610                while let Some(newline_pos) = buffer.find('\n') {
611                    let line = buffer[..newline_pos].trim().to_string();
612                    buffer = buffer[newline_pos + 1..].to_string();
613
614                    if line.is_empty() {
615                        continue;
616                    }
617
618                    match serde_json::from_str::<StreamOptionQuote>(&line) {
619                        Ok(item) => yield item,
620                        Err(e) => {
621                            tracing::warn!("Failed to parse stream option quote: {e}, line: {line}");
622                        }
623                    }
624                }
625            }
626        };
627
628        Ok(Box::pin(stream))
629    }
630
631    /// Stream order status updates for the specified accounts.
632    pub async fn stream_orders(
633        &mut self,
634        account_ids: &[&str],
635    ) -> Result<BoxStream<StreamOrder>, Error> {
636        let ids = account_ids.join(",");
637        let path = format!("/v3/brokerage/stream/accounts/{}/orders", ids);
638        let headers = self.auth_headers().await?;
639        let url = format!("{}{}", self.base_url(), &path);
640
641        let resp = self.http.get(&url).headers(headers).send().await?;
642
643        if !resp.status().is_success() {
644            let status = resp.status().as_u16();
645            let body = resp.text().await.unwrap_or_default();
646            return Err(Error::Api {
647                status,
648                message: body,
649            });
650        }
651
652        let stream = async_stream::try_stream! {
653            let mut bytes_stream = resp.bytes_stream();
654            let mut buffer = String::new();
655
656            use futures::StreamExt;
657            while let Some(chunk) = bytes_stream.next().await {
658                let chunk = chunk.map_err(Error::Http)?;
659                buffer.push_str(&String::from_utf8_lossy(&chunk));
660
661                while let Some(newline_pos) = buffer.find('\n') {
662                    let line = buffer[..newline_pos].trim().to_string();
663                    buffer = buffer[newline_pos + 1..].to_string();
664
665                    if line.is_empty() {
666                        continue;
667                    }
668
669                    match serde_json::from_str::<StreamOrder>(&line) {
670                        Ok(item) => yield item,
671                        Err(e) => {
672                            tracing::warn!("Failed to parse stream order: {e}, line: {line}");
673                        }
674                    }
675                }
676            }
677        };
678
679        Ok(Box::pin(stream))
680    }
681
682    /// Stream order updates for specific order IDs.
683    pub async fn stream_orders_by_id(
684        &mut self,
685        account_ids: &[&str],
686        order_ids: &[&str],
687    ) -> Result<BoxStream<StreamOrder>, Error> {
688        let ids = account_ids.join(",");
689        let oids = order_ids.join(",");
690        let path = format!("/v3/brokerage/stream/accounts/{}/orders/{}", ids, oids);
691        let headers = self.auth_headers().await?;
692        let url = format!("{}{}", self.base_url(), &path);
693
694        let resp = self.http.get(&url).headers(headers).send().await?;
695
696        if !resp.status().is_success() {
697            let status = resp.status().as_u16();
698            let body = resp.text().await.unwrap_or_default();
699            return Err(Error::Api {
700                status,
701                message: body,
702            });
703        }
704
705        let stream = async_stream::try_stream! {
706            let mut bytes_stream = resp.bytes_stream();
707            let mut buffer = String::new();
708
709            use futures::StreamExt;
710            while let Some(chunk) = bytes_stream.next().await {
711                let chunk = chunk.map_err(Error::Http)?;
712                buffer.push_str(&String::from_utf8_lossy(&chunk));
713
714                while let Some(newline_pos) = buffer.find('\n') {
715                    let line = buffer[..newline_pos].trim().to_string();
716                    buffer = buffer[newline_pos + 1..].to_string();
717
718                    if line.is_empty() {
719                        continue;
720                    }
721
722                    match serde_json::from_str::<StreamOrder>(&line) {
723                        Ok(item) => yield item,
724                        Err(e) => {
725                            tracing::warn!("Failed to parse stream order: {e}, line: {line}");
726                        }
727                    }
728                }
729            }
730        };
731
732        Ok(Box::pin(stream))
733    }
734
735    /// Stream position updates for the specified accounts.
736    pub async fn stream_positions(
737        &mut self,
738        account_ids: &[&str],
739    ) -> Result<BoxStream<StreamPosition>, Error> {
740        let ids = account_ids.join(",");
741        let path = format!("/v3/brokerage/stream/accounts/{}/positions", ids);
742        let headers = self.auth_headers().await?;
743        let url = format!("{}{}", self.base_url(), &path);
744
745        let resp = self.http.get(&url).headers(headers).send().await?;
746
747        if !resp.status().is_success() {
748            let status = resp.status().as_u16();
749            let body = resp.text().await.unwrap_or_default();
750            return Err(Error::Api {
751                status,
752                message: body,
753            });
754        }
755
756        let stream = async_stream::try_stream! {
757            let mut bytes_stream = resp.bytes_stream();
758            let mut buffer = String::new();
759
760            use futures::StreamExt;
761            while let Some(chunk) = bytes_stream.next().await {
762                let chunk = chunk.map_err(Error::Http)?;
763                buffer.push_str(&String::from_utf8_lossy(&chunk));
764
765                while let Some(newline_pos) = buffer.find('\n') {
766                    let line = buffer[..newline_pos].trim().to_string();
767                    buffer = buffer[newline_pos + 1..].to_string();
768
769                    if line.is_empty() {
770                        continue;
771                    }
772
773                    match serde_json::from_str::<StreamPosition>(&line) {
774                        Ok(item) => yield item,
775                        Err(e) => {
776                            tracing::warn!("Failed to parse stream position: {e}, line: {line}");
777                        }
778                    }
779                }
780            }
781        };
782
783        Ok(Box::pin(stream))
784    }
785}