betfair_stream_api/connection/
builder.rs1use core::convert::Infallible as Never;
2use core::time::Duration;
3
4use betfair_stream_types::request::RequestMessage;
5use betfair_stream_types::response::ResponseMessage;
6use tokio::runtime::Handle;
7use tokio::task::JoinSet;
8
9use super::cron::{self, FatalError};
10use super::StreamApi;
11use crate::{CacheEnabledMessages, ExternalUpdates};
12
13#[derive(Debug, Clone)]
15pub enum HeartbeatStrategy {
16 None,
18 Interval(Duration),
20}
21
22#[expect(clippy::module_name_repetitions)]
24#[derive(Debug)]
25pub struct StreamApiBuilder {
26 command_sender: tokio::sync::broadcast::Sender<RequestMessage>,
28 command_reader: tokio::sync::broadcast::Receiver<RequestMessage>,
30 provider: betfair_adapter::UnauthenticatedBetfairRpcProvider,
32 hb: HeartbeatStrategy,
34}
35
36impl StreamApiBuilder {
37 #[must_use]
39 pub fn new(
40 provider: betfair_adapter::UnauthenticatedBetfairRpcProvider,
41 hb: HeartbeatStrategy,
42 ) -> Self {
43 let (command_sender, command_reader) = tokio::sync::broadcast::channel(3);
44
45 Self {
46 command_sender,
47 command_reader,
48 provider,
49 hb,
50 }
51 }
52
53 #[must_use]
55 pub fn run_with_default_runtime(&self) -> StreamApi<ResponseMessage> {
56 self.run(&Handle::current())
57 }
58
59 #[must_use]
61 pub fn run(&self, rt_handle: &tokio::runtime::Handle) -> StreamApi<ResponseMessage> {
62 let (join_set, data_feed) = self.run_internal(rt_handle);
63 StreamApi::new(
64 join_set,
65 data_feed,
66 self.command_sender.clone(),
67 rt_handle.clone(),
68 )
69 }
70
71 #[must_use]
73 pub fn run_with_cache(
74 &self,
75 rt_handle: &tokio::runtime::Handle,
76 ) -> StreamApi<CacheEnabledMessages> {
77 let (mut join_set, data_feed) = self.run_internal(rt_handle);
78 let output_queue_reader_post_cache =
79 wrap_with_cache_layer(&mut join_set, data_feed, rt_handle);
80 StreamApi::new(
81 join_set,
82 output_queue_reader_post_cache,
83 self.command_sender.clone(),
84 rt_handle.clone(),
85 )
86 }
87
88 fn run_internal(
90 &self,
91 rt_handle: &tokio::runtime::Handle,
92 ) -> (
93 JoinSet<Result<Never, FatalError>>,
94 tokio::sync::mpsc::Receiver<ExternalUpdates<ResponseMessage>>,
95 ) {
96 let (output_queue_sender, output_queue_reader) = tokio::sync::mpsc::channel(3);
97
98 let mut join_set = JoinSet::new();
99 join_set.spawn_on(
100 {
101 let command_reader = self.command_reader.resubscribe();
102 let command_sender = self.command_sender.clone();
103 let provider = self.provider.clone();
104 let runtime_handle = rt_handle.clone();
105 let hb = self.hb.clone();
106 async move {
107 cron::StreamConnectionProcessor {
108 sender: output_queue_sender,
109 command_reader,
110 command_sender,
111 provider,
112 runtime_handle,
113 hb,
114 last_time_token_refreshed: None,
115 }
116 .connect_and_process_loop()
117 .await
118 }
119 },
120 rt_handle,
121 );
122
123 (join_set, output_queue_reader)
124 }
125}
126
127pub(crate) fn wrap_with_cache_layer(
129 join_set: &mut JoinSet<Result<Never, FatalError>>,
130 data_feed: tokio::sync::mpsc::Receiver<ExternalUpdates<ResponseMessage>>,
131 rt_handle: &tokio::runtime::Handle,
132) -> tokio::sync::mpsc::Receiver<ExternalUpdates<CacheEnabledMessages>> {
133 let (output_queue_sender_post_cache, output_queue_reader_post_cache) =
134 tokio::sync::mpsc::channel(3);
135 join_set.spawn_on(
136 cron::cache_loop(data_feed, output_queue_sender_post_cache),
137 rt_handle,
138 );
139 output_queue_reader_post_cache
140}