betfair_stream_api/
lib.rs

1//! Betfair Stream API client implementation.
2//!
3//! This crate provides an asynchronous client for interacting with Betfair's Streaming API.
4//! It manages connection setup, handshake, framing, heartbeats, and automatic reconnections.
5//! Users can customize how incoming messages are handled by implementing the `MessageProcessor` trait
6//! or using the built-in `Cache` processor for maintaining market and order caches.
7extern crate alloc;
8pub mod cache;
9use backon::{BackoffBuilder as _, ExponentialBuilder};
10use betfair_adapter::{Authenticated, BetfairRpcClient, Unauthenticated};
11pub use betfair_stream_types as types;
12use betfair_stream_types::{
13    request::{RequestMessage, authentication_message, heartbeat_message::HeartbeatMessage},
14    response::{
15        ResponseMessage,
16        connection_message::ConnectionMessage,
17        status_message::{ErrorCode, StatusMessage},
18    },
19};
20use cache::{
21    primitives::{MarketBookCache, OrderBookCache},
22    tracker::StreamState,
23};
24use core::fmt;
25use core::{pin::pin, time::Duration};
26use eyre::Context as _;
27use futures::{
28    FutureExt, SinkExt as _, StreamExt as _,
29    future::{self, BoxFuture, select},
30};
31use std::sync::Arc;
32use tokio::{
33    net::TcpStream,
34    sync::mpsc::{self, Receiver, Sender},
35    task::JoinHandle,
36    time::sleep,
37};
38use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
39use tokio_util::{
40    bytes,
41    codec::{Decoder, Encoder, Framed},
42};
43
44/// A Betfair Stream API client that handles connection, handshake, incoming/outgoing messages,
45/// heartbeat and automatic reconnects.
46/// Builder for creating a Betfair Streaming API client.
47///
48/// # Type Parameters
49///
50/// - `T`: A type that implements `MessageProcessor`, used to handle incoming `ResponseMessage` objects.
51#[derive(Debug, Clone)]
52pub struct BetfairStreamBuilder<T: MessageProcessor> {
53    /// betfair client
54    pub client: BetfairRpcClient<Unauthenticated>,
55    /// Heartbeat interval (used only if heartbeat_enabled is true)
56    pub heartbeat_interval: Option<Duration>,
57    /// The intermediate processor of messages
58    pub processor: T,
59}
60
61/// Handle to a running Betfair Streaming API client.
62///
63/// Provides channels to send requests (`send_to_stream`) and receive processed messages (`sink`).
64#[derive(Debug)]
65pub struct BetfairStreamClient<T: MessageProcessor> {
66    /// send a message to the Betfair stream
67    pub send_to_stream: Sender<RequestMessage>,
68    /// Receive a message from the stream
69    pub sink: Receiver<T::Output>,
70}
71
72/// Default `MessageProcessor` implementation that maintains market and order caches.
73///
74/// It updates an internal `StreamState` to apply incremental updates to market and order books.
75#[derive(Debug, Clone)]
76pub struct Cache {
77    state: StreamState,
78}
79
80/// Variants of messages produced by the cache-based processor.
81///
82/// `CachedMessage` represents high-level events derived from raw Betfair streaming responses,
83/// enriched with internal cache state for market and order books.
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub enum CachedMessage {
86    /// A connection handshake message received from the stream,
87    /// containing connection ID and related metadata.
88    /// Also returned on heartbeat messages.
89    Connection(ConnectionMessage),
90
91    /// A batch of market book updates, each describing the current state or changes of a market.
92    MarketChange(Vec<MarketBookCache>),
93
94    /// A batch of order book updates, representing new orders, matched orders,
95    /// and cancellations in the order cache.
96    OrderChange(Vec<OrderBookCache>),
97
98    /// A status message from the stream, used for heartbeats,
99    /// subscription confirmations, or error notifications.
100    Status(StatusMessage),
101}
102
103impl MessageProcessor for Cache {
104    type Output = CachedMessage;
105
106    fn process_message(&mut self, message: ResponseMessage) -> Option<Self::Output> {
107        match message {
108            ResponseMessage::Connection(connection_message) => {
109                Some(CachedMessage::Connection(connection_message))
110            }
111            ResponseMessage::MarketChange(market_change_message) => self
112                .state
113                .market_change_update(market_change_message)
114                .map(|markets| markets.into_iter().cloned().collect::<Vec<_>>())
115                .map(CachedMessage::MarketChange),
116            ResponseMessage::OrderChange(order_change_message) => self
117                .state
118                .order_change_update(order_change_message)
119                .map(|markets| markets.into_iter().cloned().collect::<Vec<_>>())
120                .map(CachedMessage::OrderChange),
121            ResponseMessage::Status(status_message) => Some(CachedMessage::Status(status_message)),
122        }
123    }
124}
125
126/// `MessageProcessor` that forwards raw `ResponseMessage` objects without transformation.
127#[derive(Debug)]
128pub struct Forwarder;
129impl MessageProcessor for Forwarder {
130    type Output = ResponseMessage;
131
132    fn process_message(&mut self, message: ResponseMessage) -> Option<Self::Output> {
133        Some(message)
134    }
135}
136/// Trait for processing incoming Betfair streaming `ResponseMessage` objects into user-defined outputs.
137///
138/// Implementers can filter or transform messages and control which messages are forwarded to the client sink.
139pub trait MessageProcessor: Send + Sync + 'static {
140    /// The processed message type produced by `process_message`
141    type Output: Send + Clone + Sync + 'static + core::fmt::Debug;
142
143    /// Process an incoming `ResponseMessage`.
144    ///
145    /// Returns `Some(Output)` to forward a processed message, or `None` to drop it.
146    fn process_message(&mut self, message: ResponseMessage) -> Option<Self::Output>;
147}
148
149impl<T: MessageProcessor> BetfairStreamBuilder<T> {
150    /// Creates a new `BetfairStreamBuilder` with the given authenticated RPC client.
151    ///
152    /// Uses the default `Cache` message processor to maintain market and order caches.
153    /// By default, no heartbeat messages are sent.
154    ///
155    /// # Parameters
156    ///
157    /// * `client` - An authenticated Betfair RPC client for establishing the streaming connection.
158    ///
159    /// # Returns
160    ///
161    /// A `BetfairStreamBuilder` configured with cache-based message processing.
162    pub fn new(client: BetfairRpcClient<Unauthenticated>) -> BetfairStreamBuilder<Cache> {
163        BetfairStreamBuilder {
164            client,
165            heartbeat_interval: None,
166            processor: Cache {
167                state: StreamState::new(),
168            },
169        }
170    }
171
172    /// Creates a new `BetfairStreamBuilder` with raw message forwarding.
173    ///
174    /// Uses the `Forwarder` message processor to forward raw `ResponseMessage` objects without caching.
175    /// By default, no heartbeat messages are sent.
176    ///
177    /// # Parameters
178    ///
179    /// * `client` - An authenticated Betfair RPC client for establishing the streaming connection.
180    ///
181    /// # Returns
182    ///
183    /// A `BetfairStreamBuilder` configured to forward raw messages.
184    pub fn new_without_cache(
185        client: BetfairRpcClient<Unauthenticated>,
186    ) -> BetfairStreamBuilder<Forwarder> {
187        BetfairStreamBuilder {
188            client,
189            heartbeat_interval: None,
190            processor: Forwarder,
191        }
192    }
193
194    /// Enables periodic heartbeat messages to keep the streaming connection alive.
195    ///
196    /// # Parameters
197    ///
198    /// * `interval` - The duration between heartbeat messages.
199    ///
200    /// # Returns
201    ///
202    /// The updated `BetfairStreamBuilder` with heartbeat enabled.
203    pub fn with_heartbeat(mut self, interval: Duration) -> Self {
204        self.heartbeat_interval = Some(interval);
205        self
206    }
207
208    /// Starts the Betfair streaming client and returns handles for interaction.
209    ///
210    /// This will spawn an asynchronous task that manages the connection, handshake,
211    /// incoming/outgoing messages, heartbeats (if enabled), and automatic reconnections.
212    ///
213    /// # Type Parameters
214    ///
215    /// * `C` - The capacity of the internal message channels.
216    /// * `Sp` - The type of the spawner function.
217    /// * `H` - The type of the handle returned by the spawner.
218    ///
219    /// # Parameters
220    ///
221    /// * `spawner` - A function that takes a boxed future and returns a handle to the spawned task.
222    ///
223    /// # Returns
224    ///
225    /// * `BetfairStreamClient<T>` - A client handle providing:
226    ///     - `send_to_stream`: a channel sender for outgoing `RequestMessage`s.
227    ///     - `sink`: a channel receiver for processed messages of type `T::Output`.
228    /// * `H` - A handle to the background task driving the streaming logic, type depends on the spawner.
229    pub fn start_with<const C: usize, Sp, H>(self, spawner: Sp) -> (BetfairStreamClient<T>, H)
230    where
231        Sp: FnOnce(BoxFuture<'static, eyre::Result<()>>) -> H,
232    {
233        let (to_stream_tx, to_stream_rx) = mpsc::channel(C);
234        let (from_stream_tx, from_stream_rx) = mpsc::channel(C);
235
236        // let task = tokio::task::spawn(self.run(from_stream_tx, to_stream_rx));
237        let fut = self.run(from_stream_tx, to_stream_rx).boxed();
238        let handle = spawner(fut);
239
240        (
241            BetfairStreamClient {
242                send_to_stream: to_stream_tx,
243                sink: from_stream_rx,
244            },
245            handle,
246        )
247    }
248
249    /// Starts the Betfair streaming client with the default Tokio task spawner.
250    ///
251    /// This is a convenience method that uses `tokio::spawn` to run the streaming task.
252    ///
253    /// # Type Parameters
254    ///
255    /// * `C` - The capacity of the internal message channels.
256    ///
257    /// # Returns
258    ///
259    /// * `BetfairStreamClient<T>` - A client handle for interacting with the stream.
260    /// * `JoinHandle<eyre::Result<()>>` - A Tokio join handle for the background streaming task.
261    pub fn start<const C: usize>(self) -> (BetfairStreamClient<T>, JoinHandle<eyre::Result<()>>) {
262        self.start_with::<C, _, _>(|fut| tokio::spawn(fut))
263    }
264
265    async fn run(
266        self,
267        from_stream_tx: Sender<T::Output>,
268        to_stream_rx: Receiver<RequestMessage>,
269    ) -> eyre::Result<()> {
270        if let Some(hb) = self.heartbeat_interval {
271            let heartbeat_stream = {
272                let mut interval = tokio::time::interval(hb);
273                interval.reset();
274                let interval_stream = IntervalStream::new(interval).fuse();
275                interval_stream
276                    .map(move |instant| HeartbeatMessage {
277                        id: Some(
278                            instant
279                                .into_std()
280                                .elapsed()
281                                .as_secs()
282                                .try_into()
283                                .unwrap_or_default(),
284                        ),
285                    })
286                    .map(RequestMessage::Heartbeat)
287                    .boxed()
288            };
289            let input_stream = futures::stream::select_all([
290                heartbeat_stream,
291                ReceiverStream::new(to_stream_rx).boxed(),
292            ]);
293
294            self.run_base(from_stream_tx, input_stream).await
295        } else {
296            self.run_base(from_stream_tx, ReceiverStream::new(to_stream_rx))
297                .await
298        }
299    }
300
301    async fn run_base(
302        mut self,
303        mut from_stream_tx: Sender<T::Output>,
304        mut to_stream_rx: impl futures::Stream<Item = RequestMessage> + Unpin,
305    ) -> eyre::Result<()> {
306        let (mut client, _) = self.client.clone().authenticate().await?;
307        let mut backoff = ExponentialBuilder::new().build();
308        let mut first_call = true;
309        'retry: loop {
310            if !first_call {
311                // add exponential recovery
312                let Some(delay) = backoff.next() else {
313                    eyre::bail!("connection retry attempts exceeded")
314                };
315                sleep(delay).await;
316            }
317            first_call = true;
318
319            // Connect (with handshake) using retry logic.
320            let mut stream = self
321                .connect_with_retry(&mut from_stream_tx, &mut client)
322                .await?;
323            tracing::info!("Connected to {}", self.client.stream.url());
324
325            loop {
326                let stream_next = pin!(stream.next());
327                let to_stream_rx_next = pin!(to_stream_rx.next());
328                match select(to_stream_rx_next, stream_next).await {
329                    future::Either::Left((request, _)) => {
330                        let Some(request) = request else {
331                            tracing::info!("request channel closed, shutting down stream task");
332                            return Ok(());
333                        };
334
335                        tracing::debug!(?request, "sending to betfair");
336                        let Ok(()) = stream.send(request).await else {
337                            tracing::warn!("could not send request to stream");
338                            continue 'retry;
339                        };
340                    }
341                    future::Either::Right((message, _)) => {
342                        let Some(message) = message else {
343                            tracing::warn!("stream returned None");
344                            continue 'retry;
345                        };
346
347                        match message {
348                            Ok(message) => {
349                                let message = self.processor.process_message(message);
350                                tracing::debug!(?message, "received from betfair");
351                                let Some(message) = message else {
352                                    continue;
353                                };
354
355                                if let Err(err) = from_stream_tx.send(message).await {
356                                    tracing::info!(
357                                        "output channel receiver dropped, shutting down stream task: {:?}",
358                                        err
359                                    );
360                                    return Ok(());
361                                };
362                            }
363                            Err(err) => tracing::warn!(?err, "reading message error"),
364                        }
365                    }
366                }
367            }
368        }
369    }
370
371    /// Attempt to connect and perform a handshake using exponential backoff.
372    #[tracing::instrument(skip_all, err)]
373    async fn connect_with_retry(
374        &mut self,
375        from_stream_tx: &mut Sender<T::Output>,
376        client: &mut Arc<BetfairRpcClient<Authenticated>>,
377    ) -> eyre::Result<Framed<tokio_rustls::client::TlsStream<TcpStream>, StreamAPIClientCodec>>
378    {
379        let mut backoff = ExponentialBuilder::new().build();
380        let mut delay = async || {
381            if let Some(delay) = backoff.next() {
382                sleep(delay).await;
383                Ok(())
384            } else {
385                eyre::bail!("exceeded retry attempts, could not connect");
386            }
387        };
388
389        loop {
390            let server_addr = self.client.stream.url();
391            let host = server_addr
392                .host_str()
393                .ok_or_else(|| eyre::eyre!("invalid betfair url"))?;
394            let port = server_addr.port().unwrap_or(443);
395
396            let domain_str = server_addr
397                .domain()
398                .ok_or_else(|| eyre::eyre!("domain must be known"))?;
399            let domain = rustls::pki_types::ServerName::try_from(domain_str.to_owned())
400                .wrap_err("failed to parse server name")?;
401
402            // Resolve socket addresses each iteration in case DNS changes
403            let Some(socket_addr) = tokio::net::lookup_host((host, port)).await?.next() else {
404                eyre::bail!("no valid socket addresses for {host}:{port}")
405            };
406
407            let tcp_stream = TcpStream::connect(socket_addr).await;
408            let Ok(stream) = tcp_stream else {
409                tracing::error!(err = ?tcp_stream.unwrap_err(), "Connect error. Retrying...");
410                delay().await?;
411                continue;
412            };
413            let tls_stream = tls_connector()?.connect(domain.clone(), stream).await?;
414            let mut tls_stream = Framed::new(tls_stream, StreamAPIClientCodec);
415
416            match self
417                .handshake(from_stream_tx, client, &mut tls_stream)
418                .await
419            {
420                Ok(()) => return Ok(tls_stream),
421                Err(err) => match err {
422                    HandshakeErr::WaitAndRetry => {
423                        delay().await?;
424                        continue;
425                    }
426                    HandshakeErr::Reauthenticate => {
427                        let (new_client, _) = self.client.clone().authenticate().await?;
428                        *client = new_client;
429                        delay().await?;
430                        continue;
431                    }
432                    HandshakeErr::Fatal => eyre::bail!("fatal error in stream processing"),
433                },
434            }
435        }
436    }
437
438    #[tracing::instrument(err, skip_all)]
439    async fn handshake(
440        &mut self,
441        from_stream_tx: &mut Sender<T::Output>,
442        client: &BetfairRpcClient<Authenticated>,
443        stream: &mut Framed<tokio_rustls::client::TlsStream<TcpStream>, StreamAPIClientCodec>,
444    ) -> Result<(), HandshakeErr> {
445        // await con message
446        let res = stream
447            .next()
448            .await
449            .transpose()
450            .inspect_err(|err| {
451                tracing::warn!(?err, "error when parsing stream message");
452            })
453            .map_err(|_| HandshakeErr::WaitAndRetry)?
454            .ok_or(HandshakeErr::WaitAndRetry)?;
455        tracing::info!(?res, "message from stream");
456        let message = self
457            .processor
458            .process_message(res.clone())
459            .ok_or(HandshakeErr::Fatal)
460            .inspect_err(|_err| {
461                tracing::error!(
462                    "processor.process_message returned None for connection message: {:?}",
463                    res
464                )
465            })?;
466        from_stream_tx
467            .send(message.clone())
468            .await
469            .inspect_err(|err| {
470                tracing::warn!("failed to send connection message to channel: {:?}", err)
471            })
472            .map_err(|_| HandshakeErr::Fatal)?;
473        let ResponseMessage::Connection(_) = &res else {
474            tracing::warn!("stream responded with invalid connection message");
475            return Err(HandshakeErr::Reauthenticate);
476        };
477
478        // send auth msg
479        let msg = authentication_message::AuthenticationMessage {
480            id: Some(-1),
481            session: client.session_token().0.expose_secret().clone(),
482            app_key: self
483                .client
484                .secret_provider
485                .application_key
486                .0
487                .expose_secret()
488                .clone(),
489        };
490        stream
491            .send(RequestMessage::Authentication(msg))
492            .await
493            .inspect_err(|err| tracing::warn!(?err, "stream exited"))
494            .map_err(|_| HandshakeErr::WaitAndRetry)?;
495
496        // await status message
497        let message = stream
498            .next()
499            .await
500            .transpose()
501            .inspect_err(|err| {
502                tracing::warn!(?err, "error when parsing stream message");
503            })
504            .map_err(|_| HandshakeErr::WaitAndRetry)?
505            .ok_or(HandshakeErr::WaitAndRetry)?;
506        let processed_message = self
507            .processor
508            .process_message(message.clone())
509            .ok_or(HandshakeErr::Fatal)
510            .inspect_err(|_err| {
511                tracing::warn!(
512                    "processor.process_message returned None for status message: {:?}",
513                    message
514                )
515            })
516            .map_err(|_| HandshakeErr::Fatal)?;
517        from_stream_tx
518            .send(processed_message)
519            .await
520            .inspect_err(|err| {
521                tracing::warn!("failed to send status message to channel: {:?}", err)
522            })
523            .map_err(|_| HandshakeErr::Fatal)?;
524        tracing::info!(?message, "message from stream");
525        let ResponseMessage::Status(status_message) = &message else {
526            tracing::warn!("expected status message, got {message:?}");
527            return Err(HandshakeErr::WaitAndRetry);
528        };
529
530        let StatusMessage::Failure(err) = &status_message else {
531            return Ok(());
532        };
533
534        tracing::error!(?err, "stream respondend with an error");
535        let action = match err.error_code {
536            ErrorCode::NoAppKey => HandshakeErr::Fatal,
537            ErrorCode::InvalidAppKey => HandshakeErr::Fatal,
538            ErrorCode::NoSession => HandshakeErr::Reauthenticate,
539            ErrorCode::InvalidSessionInformation => HandshakeErr::Reauthenticate,
540            ErrorCode::NotAuthorized => HandshakeErr::Reauthenticate,
541            ErrorCode::InvalidInput => HandshakeErr::Fatal,
542            ErrorCode::InvalidClock => HandshakeErr::Fatal,
543            ErrorCode::UnexpectedError => HandshakeErr::Fatal,
544            ErrorCode::Timeout => HandshakeErr::WaitAndRetry,
545            ErrorCode::SubscriptionLimitExceeded => HandshakeErr::WaitAndRetry,
546            ErrorCode::InvalidRequest => HandshakeErr::Fatal,
547            ErrorCode::ConnectionFailed => HandshakeErr::WaitAndRetry,
548            ErrorCode::MaxConnectionLimitExceeded => HandshakeErr::Fatal,
549            ErrorCode::TooManyRequests => HandshakeErr::WaitAndRetry,
550        };
551
552        Err(action)
553    }
554}
555
556#[derive(Debug)]
557enum HandshakeErr {
558    WaitAndRetry,
559    Reauthenticate,
560    Fatal,
561}
562
563impl fmt::Display for HandshakeErr {
564    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565        write!(f, "Stream Handshake Error {:?}", self)
566    }
567}
568
569impl core::error::Error for HandshakeErr {}
570
571#[tracing::instrument(err)]
572fn tls_connector() -> eyre::Result<tokio_rustls::TlsConnector> {
573    use tokio_rustls::TlsConnector;
574
575    let mut roots = rustls::RootCertStore::empty();
576    let native_certs = rustls_native_certs::load_native_certs();
577    for cert in native_certs.certs {
578        roots.add(cert)?;
579    }
580
581    let config = rustls::ClientConfig::builder()
582        .with_root_certificates(roots)
583        .with_no_client_auth();
584    Ok(TlsConnector::from(Arc::new(config)))
585}
586
587/// Defines the encoding and decoding of Betfair stream api data structures using tokio
588pub struct StreamAPIClientCodec;
589
590impl Decoder for StreamAPIClientCodec {
591    type Item = ResponseMessage;
592    type Error = eyre::Report;
593
594    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
595        // Find position of `\n` first
596        if let Some(pos) = src.iter().position(|&byte| byte == b'\n') {
597            // Check if the preceding byte is `\r`
598            let delimiter_size = if pos > 0 && src[pos - 1] == b'\r' {
599                2
600            } else {
601                1
602            };
603
604            // Extract up to and including the delimiter
605            let line = src.split_to(pos + 1);
606
607            // Separate out the delimiter bytes
608            let (json_part, _) = line.split_at(line.len().saturating_sub(delimiter_size));
609
610            // Now we can parse it as JSON
611            let data = serde_json::from_slice::<Self::Item>(json_part)?;
612            return Ok(Some(data));
613        }
614        Ok(None)
615    }
616}
617
618impl Encoder<RequestMessage> for StreamAPIClientCodec {
619    type Error = eyre::Report;
620
621    fn encode(
622        &mut self,
623        item: RequestMessage,
624        dst: &mut bytes::BytesMut,
625    ) -> Result<(), Self::Error> {
626        // Serialize the item to a JSON string
627        let json = serde_json::to_string(&item)?;
628        // Write the JSON string to the buffer, followed by a newline
629        dst.extend_from_slice(json.as_bytes());
630        dst.extend_from_slice(b"\r\n");
631        Ok(())
632    }
633}
634
635#[cfg(test)]
636mod tests {
637
638    use core::fmt::Write as _;
639
640    use super::*;
641
642    #[tokio::test]
643    async fn can_resolve_host_ipv4() {
644        let url = url::Url::parse("tcptls://stream-api.betfair.com:443").unwrap();
645        let host = url.host_str().unwrap();
646        let port = url
647            .port()
648            .unwrap_or_else(|| if url.scheme() == "https" { 443 } else { 80 });
649        let socket_addr = tokio::net::lookup_host((host, port))
650            .await
651            .unwrap()
652            .next()
653            .unwrap();
654        assert!(socket_addr.ip().is_ipv4());
655        assert_eq!(socket_addr.port(), 443);
656    }
657
658    #[test]
659    fn can_decode_single_message() {
660        let msg = r#"{"op":"connection","connectionId":"002-051134157842-432409"}"#;
661        let separator = "\r\n";
662        let data = format!("{msg}{separator}");
663
664        let mut codec = StreamAPIClientCodec;
665        let mut buf = bytes::BytesMut::from(data.as_bytes());
666        let msg = codec.decode(&mut buf).unwrap().unwrap();
667
668        assert!(matches!(msg, ResponseMessage::Connection(_)));
669    }
670
671    #[test]
672    fn can_decode_multiple_messages() {
673        // contains two messages
674        let msg_one = r#"{"op":"connection","connectionId":"002-051134157842-432409"}"#;
675        let msg_two = r#"{"op":"ocm","id":3,"clk":"AAAAAAAA","status":503,"pt":1498137379766,"ct":"HEARTBEAT"}"#;
676        let separator = "\r\n";
677        let data = format!("{msg_one}{separator}{msg_two}{separator}");
678
679        let mut codec = StreamAPIClientCodec;
680        let mut buf = bytes::BytesMut::from(data.as_bytes());
681        let msg_one = codec.decode(&mut buf).unwrap().unwrap();
682        let msg_two = codec.decode(&mut buf).unwrap().unwrap();
683
684        assert!(matches!(msg_one, ResponseMessage::Connection(_)));
685        assert!(matches!(msg_two, ResponseMessage::OrderChange(_)));
686    }
687
688    #[test]
689    fn can_decode_multiple_partial_messages() {
690        // contains two messages
691        let msg_one = r#"{"op":"connection","connectionId":"002-051134157842-432409"}"#;
692        let msg_two_pt_one = r#"{"op":"ocm","id":3,"clk""#;
693        let msg_two_pt_two = r#":"AAAAAAAA","status":503,"pt":1498137379766,"ct":"HEARTBEAT"}"#;
694        let separator = "\r\n";
695        let data = format!("{msg_one}{separator}{msg_two_pt_one}");
696
697        let mut codec = StreamAPIClientCodec;
698        let mut buf = bytes::BytesMut::from(data.as_bytes());
699        let msg_one = codec.decode(&mut buf).unwrap().unwrap();
700        let msg_two_attempt = codec.decode(&mut buf).unwrap();
701        assert!(msg_two_attempt.is_none());
702        buf.write_str(msg_two_pt_two).unwrap();
703        buf.write_str(separator).unwrap();
704        let msg_two = codec.decode(&mut buf).unwrap().unwrap();
705
706        assert!(matches!(msg_one, ResponseMessage::Connection(_)));
707        assert!(matches!(msg_two, ResponseMessage::OrderChange(_)));
708    }
709
710    #[test]
711    fn can_decode_subsequent_messages() {
712        // contains two messages
713        let msg_one = r#"{"op":"connection","connectionId":"002-051134157842-432409"}"#;
714        let msg_two = r#"{"op":"ocm","id":3,"clk":"AAAAAAAA","status":503,"pt":1498137379766,"ct":"HEARTBEAT"}"#;
715        let separator = "\r\n";
716        let data = format!("{msg_one}{separator}");
717
718        let mut codec = StreamAPIClientCodec;
719        let mut buf = bytes::BytesMut::from(data.as_bytes());
720        let msg_one = codec.decode(&mut buf).unwrap().unwrap();
721        let msg_two_attempt = codec.decode(&mut buf).unwrap();
722        assert!(msg_two_attempt.is_none());
723        let data = format!("{msg_two}{separator}");
724        buf.write_str(data.as_str()).unwrap();
725        let msg_two = codec.decode(&mut buf).unwrap().unwrap();
726
727        assert!(matches!(msg_one, ResponseMessage::Connection(_)));
728        assert!(matches!(msg_two, ResponseMessage::OrderChange(_)));
729    }
730
731    #[test]
732    fn can_encode_message() {
733        let msg = RequestMessage::Authentication(
734            betfair_stream_types::request::authentication_message::AuthenticationMessage {
735                id: Some(1),
736                session: "sss".to_owned(),
737                app_key: "aaaa".to_owned(),
738            },
739        );
740        let mut codec = StreamAPIClientCodec;
741        let mut buf = bytes::BytesMut::new();
742        codec.encode(msg, &mut buf).unwrap();
743
744        let data = buf.freeze();
745        let data = core::str::from_utf8(&data).unwrap();
746
747        // assert that we have the suffix \r\n
748        assert!(data.ends_with("\r\n"));
749        // assert that we have the prefix {"op":"authentication"
750        assert!(data.starts_with("{\"op\":\"authentication\""));
751    }
752}