betfair_stream_api/connection/
builder.rs

1use core::convert::Infallible as Never;
2use core::time::Duration;
3
4use betfair_stream_types::request::RequestMessage;
5use betfair_stream_types::response::ResponseMessage;
6use tokio::runtime::Handle;
7use tokio::task::JoinSet;
8
9use super::cron::{self, FatalError};
10use super::StreamApi;
11use crate::{CacheEnabledMessages, ExternalUpdates};
12
13/// Represents the strategy for sending heartbeat messages.
14#[derive(Debug, Clone)]
15pub enum HeartbeatStrategy {
16    /// No heartbeat strategy.
17    None,
18    /// Send heartbeat messages at a specified interval.
19    Interval(Duration),
20}
21
22/// Builder for creating a `StreamApi` instance.
23#[expect(clippy::module_name_repetitions)]
24#[derive(Debug)]
25pub struct StreamApiBuilder {
26    /// Sender for sending commands to the underlying stream.
27    command_sender: tokio::sync::broadcast::Sender<RequestMessage>,
28    /// Receiver for reading commands from the underlying stream.
29    command_reader: tokio::sync::broadcast::Receiver<RequestMessage>,
30    /// Betfair provider for making requests.
31    provider: betfair_adapter::UnauthenticatedBetfairRpcProvider,
32    /// Heartbeat strategy for the stream.
33    hb: HeartbeatStrategy,
34}
35
36impl StreamApiBuilder {
37    /// Creates a new `StreamApiBuilder` with the specified provider and heartbeat strategy.
38    #[must_use]
39    pub fn new(
40        provider: betfair_adapter::UnauthenticatedBetfairRpcProvider,
41        hb: HeartbeatStrategy,
42    ) -> Self {
43        let (command_sender, command_reader) = tokio::sync::broadcast::channel(3);
44
45        Self {
46            command_sender,
47            command_reader,
48            provider,
49            hb,
50        }
51    }
52
53    /// Runs the `StreamApi` with the default Tokio runtime.
54    #[must_use]
55    pub fn run_with_default_runtime(&self) -> StreamApi<ResponseMessage> {
56        self.run(&Handle::current())
57    }
58
59    /// Runs the `StreamApi` with the specified Tokio runtime handle.
60    #[must_use]
61    pub fn run(&self, rt_handle: &tokio::runtime::Handle) -> StreamApi<ResponseMessage> {
62        let (join_set, data_feed) = self.run_internal(rt_handle);
63        StreamApi::new(
64            join_set,
65            data_feed,
66            self.command_sender.clone(),
67            rt_handle.clone(),
68        )
69    }
70
71    /// Runs the `StreamApi` with caching enabled.
72    #[must_use]
73    pub fn run_with_cache(
74        &self,
75        rt_handle: &tokio::runtime::Handle,
76    ) -> StreamApi<CacheEnabledMessages> {
77        let (mut join_set, data_feed) = self.run_internal(rt_handle);
78        let output_queue_reader_post_cache =
79            wrap_with_cache_layer(&mut join_set, data_feed, rt_handle);
80        StreamApi::new(
81            join_set,
82            output_queue_reader_post_cache,
83            self.command_sender.clone(),
84            rt_handle.clone(),
85        )
86    }
87
88    /// Internal method to run the `StreamApi` and return a join set and data feed.
89    fn run_internal(
90        &self,
91        rt_handle: &tokio::runtime::Handle,
92    ) -> (
93        JoinSet<Result<Never, FatalError>>,
94        tokio::sync::mpsc::Receiver<ExternalUpdates<ResponseMessage>>,
95    ) {
96        let (output_queue_sender, output_queue_reader) = tokio::sync::mpsc::channel(3);
97
98        let mut join_set = JoinSet::new();
99        join_set.spawn_on(
100            {
101                let command_reader = self.command_reader.resubscribe();
102                let command_sender = self.command_sender.clone();
103                let provider = self.provider.clone();
104                let runtime_handle = rt_handle.clone();
105                let hb = self.hb.clone();
106                async move {
107                    cron::StreamConnectionProcessor {
108                        sender: output_queue_sender,
109                        command_reader,
110                        command_sender,
111                        provider,
112                        runtime_handle,
113                        hb,
114                        last_time_token_refreshed: None,
115                    }
116                    .connect_and_process_loop()
117                    .await
118                }
119            },
120            rt_handle,
121        );
122
123        (join_set, output_queue_reader)
124    }
125}
126
127/// Wraps the data feed with a cache layer and returns the receiver for external updates.
128pub(crate) fn wrap_with_cache_layer(
129    join_set: &mut JoinSet<Result<Never, FatalError>>,
130    data_feed: tokio::sync::mpsc::Receiver<ExternalUpdates<ResponseMessage>>,
131    rt_handle: &tokio::runtime::Handle,
132) -> tokio::sync::mpsc::Receiver<ExternalUpdates<CacheEnabledMessages>> {
133    let (output_queue_sender_post_cache, output_queue_reader_post_cache) =
134        tokio::sync::mpsc::channel(3);
135    join_set.spawn_on(
136        cron::cache_loop(data_feed, output_queue_sender_post_cache),
137        rt_handle,
138    );
139    output_queue_reader_post_cache
140}