Skip to main content

betfair_stream_api/cache/
order_subscriber.rs

1use betfair_adapter::betfair_types::customer_strategy_ref::CustomerStrategyRef;
2use betfair_stream_types::request::RequestMessage;
3use betfair_stream_types::request::order_subscription_message::{
4    OrderFilter, OrderSubscriptionMessage,
5};
6use tokio::sync::mpsc::Sender;
7
8use crate::{BetfairStreamClient, MessageProcessor};
9
10/// A wrapper around a `StreamListener` that allows subscribing to order updates with a somewhat
11/// ergonomic API.
12pub struct OrderSubscriber {
13    command_sender: Sender<RequestMessage>,
14    filter: OrderFilter,
15}
16
17impl OrderSubscriber {
18    #[must_use]
19    pub fn new<T: MessageProcessor>(
20        stream_api_connection: &BetfairStreamClient<T>,
21        filter: OrderFilter,
22    ) -> Self {
23        let command_sender = stream_api_connection.send_to_stream.clone();
24        Self {
25            command_sender,
26            filter,
27        }
28    }
29
30    /// Create a new market subscriber.
31    ///
32    /// # Errors
33    /// If the message cannot be sent to the stream.
34    pub async fn subscribe_to_strategy_updates(
35        &mut self,
36        strategy_ref: CustomerStrategyRef,
37    ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
38        if let Some(ref mut strategy_refs) = self.filter.customer_strategy_refs {
39            strategy_refs.push(strategy_ref);
40        } else {
41            self.filter.customer_strategy_refs = Some(vec![strategy_ref]);
42        }
43
44        self.resubscribe().await
45    }
46
47    /// Unsubscribe from a market.
48    ///
49    /// # Errors
50    /// If the message cannot be sent to the stream.
51    pub async fn unsubscribe_from_strategy_updates(
52        &mut self,
53        strategy_ref: &CustomerStrategyRef,
54    ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
55        if let Some(x) = self.filter.customer_strategy_refs.as_mut() {
56            x.retain(|iter_strategy_ref| iter_strategy_ref != strategy_ref);
57        }
58
59        if self
60            .filter
61            .customer_strategy_refs
62            .as_ref()
63            .is_none_or(alloc::vec::Vec::is_empty)
64        {
65            self.unsubscribe_from_all_markets().await?;
66        }
67
68        Ok(())
69    }
70
71    /// Unsubscribe from all markets.
72    ///
73    /// Internally it uses a weird trick of subscribing to a market that does not exist to simulate
74    /// unsubscribing from all markets.
75    /// [betfair docs](https://forum.developer.betfair.com/forum/sports-exchange-api/exchange-api/34555-stream-api-unsubscribe-from-all-markets)
76    ///
77    /// # Errors
78    /// if the message cannot be sent to the stream.
79    pub async fn unsubscribe_from_all_markets(
80        &mut self,
81    ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
82        let strategy_that_does_not_exist = CustomerStrategyRef::new([
83            'd', 'o', 's', 'e', 'n', 't', ' ', 'e', 'x', 'i', 's', 't', ' ', ' ', ' ',
84        ]);
85        self.filter = OrderFilter::default();
86
87        let req = RequestMessage::OrderSubscription(OrderSubscriptionMessage {
88            id: None,
89            segmentation_enabled: Some(true),
90            clk: None,
91            heartbeat_ms: Some(5000),
92            initial_clk: None,
93            order_filter: Some(Box::new(OrderFilter {
94                include_overall_position: Some(false),
95                account_ids: None,
96                customer_strategy_refs: Some(vec![strategy_that_does_not_exist]),
97                partition_matched_by_strategy_ref: None,
98            })),
99            conflate_ms: None,
100        });
101        self.command_sender.send(req).await?;
102
103        Ok(())
104    }
105
106    /// Resubscribe to the stream.
107    ///
108    /// This is useful when the stream is disconnected and you want to resubscribe to the stream.
109    ///
110    /// # Errors
111    /// if the stream fails to send the message
112    pub async fn resubscribe(
113        &self,
114    ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
115        let req = RequestMessage::OrderSubscription(OrderSubscriptionMessage {
116            id: None,
117            clk: None,         // empty to reset the clock
118            initial_clk: None, // empty to reset the clock
119            segmentation_enabled: Some(true),
120            heartbeat_ms: Some(5000),
121            order_filter: Some(Box::new(self.filter.clone())),
122            conflate_ms: None,
123        });
124        self.command_sender.send(req).await?;
125
126        Ok(())
127    }
128
129    #[must_use]
130    pub const fn filter(&self) -> &OrderFilter {
131        &self.filter
132    }
133
134    /// Set the filter for the subscriber.
135    ///
136    /// # Errors
137    /// if the stream fails to send the message
138    pub async fn set_filter(
139        &mut self,
140        filter: OrderFilter,
141    ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
142        self.filter = filter;
143        self.resubscribe().await
144    }
145}