betfair_stream_api/cache/
order_subscriber.rs1use betfair_adapter::betfair_types::customer_strategy_ref::CustomerStrategyRef;
2use betfair_stream_types::request::RequestMessage;
3use betfair_stream_types::request::order_subscription_message::{
4 OrderFilter, OrderSubscriptionMessage,
5};
6use tokio::sync::mpsc::Sender;
7
8use crate::{BetfairStreamClient, MessageProcessor};
9
10pub struct OrderSubscriber {
13 command_sender: Sender<RequestMessage>,
14 filter: OrderFilter,
15}
16
17impl OrderSubscriber {
18 #[must_use]
19 pub fn new<T: MessageProcessor>(
20 stream_api_connection: &BetfairStreamClient<T>,
21 filter: OrderFilter,
22 ) -> Self {
23 let command_sender = stream_api_connection.send_to_stream.clone();
24 Self {
25 command_sender,
26 filter,
27 }
28 }
29
30 pub async fn subscribe_to_strategy_updates(
35 &mut self,
36 strategy_ref: CustomerStrategyRef,
37 ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
38 if let Some(ref mut strategy_refs) = self.filter.customer_strategy_refs {
39 strategy_refs.push(strategy_ref);
40 } else {
41 self.filter.customer_strategy_refs = Some(vec![strategy_ref]);
42 }
43
44 self.resubscribe().await
45 }
46
47 pub async fn unsubscribe_from_strategy_updates(
52 &mut self,
53 strategy_ref: &CustomerStrategyRef,
54 ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
55 if let Some(x) = self.filter.customer_strategy_refs.as_mut() {
56 x.retain(|iter_strategy_ref| iter_strategy_ref != strategy_ref);
57 }
58
59 if self
60 .filter
61 .customer_strategy_refs
62 .as_ref()
63 .is_none_or(alloc::vec::Vec::is_empty)
64 {
65 self.unsubscribe_from_all_markets().await?;
66 }
67
68 Ok(())
69 }
70
71 pub async fn unsubscribe_from_all_markets(
80 &mut self,
81 ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
82 let strategy_that_does_not_exist = CustomerStrategyRef::new([
83 'd', 'o', 's', 'e', 'n', 't', ' ', 'e', 'x', 'i', 's', 't', ' ', ' ', ' ',
84 ]);
85 self.filter = OrderFilter::default();
86
87 let req = RequestMessage::OrderSubscription(OrderSubscriptionMessage {
88 id: None,
89 segmentation_enabled: Some(true),
90 clk: None,
91 heartbeat_ms: Some(5000),
92 initial_clk: None,
93 order_filter: Some(Box::new(OrderFilter {
94 include_overall_position: Some(false),
95 account_ids: None,
96 customer_strategy_refs: Some(vec![strategy_that_does_not_exist]),
97 partition_matched_by_strategy_ref: None,
98 })),
99 conflate_ms: None,
100 });
101 self.command_sender.send(req).await?;
102
103 Ok(())
104 }
105
106 pub async fn resubscribe(
113 &self,
114 ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
115 let req = RequestMessage::OrderSubscription(OrderSubscriptionMessage {
116 id: None,
117 clk: None, initial_clk: None, segmentation_enabled: Some(true),
120 heartbeat_ms: Some(5000),
121 order_filter: Some(Box::new(self.filter.clone())),
122 conflate_ms: None,
123 });
124 self.command_sender.send(req).await?;
125
126 Ok(())
127 }
128
129 #[must_use]
130 pub const fn filter(&self) -> &OrderFilter {
131 &self.filter
132 }
133
134 pub async fn set_filter(
139 &mut self,
140 filter: OrderFilter,
141 ) -> Result<(), tokio::sync::mpsc::error::SendError<RequestMessage>> {
142 self.filter = filter;
143 self.resubscribe().await
144 }
145}