1use nautilus_core::{Params, UUID4};
22use nautilus_model::{
23 defi::{Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap},
24 identifiers::{ClientId, InstrumentId},
25};
26
27use crate::{
28 actor::DataActorCore,
29 defi::{
30 DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
31 SubscribePoolFeeCollects, SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates,
32 SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool, UnsubscribePoolFeeCollects,
33 UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
34 switchboard::{
35 get_defi_blocks_topic, get_defi_collect_topic, get_defi_flash_topic,
36 get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
37 },
38 },
39 messages::data::DataCommand,
40 msgbus::{MStr, Topic, TypedHandler},
41};
42
43impl DataActorCore {
44 pub fn subscribe_blocks(
46 &mut self,
47 topic: MStr<Topic>,
48 handler: TypedHandler<Block>,
49 chain: Blockchain,
50 client_id: Option<ClientId>,
51 params: Option<Params>,
52 ) {
53 self.check_registered();
54
55 self.add_block_subscription(topic, handler);
56
57 let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
58 chain,
59 client_id,
60 command_id: UUID4::new(),
61 ts_init: self.timestamp_ns(),
62 params,
63 });
64
65 self.send_data_cmd(DataCommand::DefiSubscribe(command));
66 }
67
68 pub fn subscribe_pool(
70 &mut self,
71 topic: MStr<Topic>,
72 handler: TypedHandler<Pool>,
73 instrument_id: InstrumentId,
74 client_id: Option<ClientId>,
75 params: Option<Params>,
76 ) {
77 self.check_registered();
78
79 self.add_pool_subscription(topic, handler);
80
81 let command = DefiSubscribeCommand::Pool(SubscribePool {
82 instrument_id,
83 client_id,
84 command_id: UUID4::new(),
85 ts_init: self.timestamp_ns(),
86 params,
87 });
88
89 self.send_data_cmd(DataCommand::DefiSubscribe(command));
90 }
91
92 pub fn subscribe_pool_swaps(
94 &mut self,
95 topic: MStr<Topic>,
96 handler: TypedHandler<PoolSwap>,
97 instrument_id: InstrumentId,
98 client_id: Option<ClientId>,
99 params: Option<Params>,
100 ) {
101 self.check_registered();
102
103 self.add_pool_swap_subscription(topic, handler);
104
105 let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
106 instrument_id,
107 client_id,
108 command_id: UUID4::new(),
109 ts_init: self.timestamp_ns(),
110 params,
111 });
112
113 self.send_data_cmd(DataCommand::DefiSubscribe(command));
114 }
115
116 pub fn subscribe_pool_liquidity_updates(
118 &mut self,
119 topic: MStr<Topic>,
120 handler: TypedHandler<PoolLiquidityUpdate>,
121 instrument_id: InstrumentId,
122 client_id: Option<ClientId>,
123 params: Option<Params>,
124 ) {
125 self.check_registered();
126
127 self.add_pool_liquidity_subscription(topic, handler);
128
129 let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
130 instrument_id,
131 client_id,
132 command_id: UUID4::new(),
133 ts_init: self.timestamp_ns(),
134 params,
135 });
136
137 self.send_data_cmd(DataCommand::DefiSubscribe(command));
138 }
139
140 pub fn subscribe_pool_fee_collects(
142 &mut self,
143 topic: MStr<Topic>,
144 handler: TypedHandler<PoolFeeCollect>,
145 instrument_id: InstrumentId,
146 client_id: Option<ClientId>,
147 params: Option<Params>,
148 ) {
149 self.check_registered();
150
151 self.add_pool_collect_subscription(topic, handler);
152
153 let command = DefiSubscribeCommand::PoolFeeCollects(SubscribePoolFeeCollects {
154 instrument_id,
155 client_id,
156 command_id: UUID4::new(),
157 ts_init: self.timestamp_ns(),
158 params,
159 });
160
161 self.send_data_cmd(DataCommand::DefiSubscribe(command));
162 }
163
164 pub fn subscribe_pool_flash_events(
166 &mut self,
167 topic: MStr<Topic>,
168 handler: TypedHandler<PoolFlash>,
169 instrument_id: InstrumentId,
170 client_id: Option<ClientId>,
171 params: Option<Params>,
172 ) {
173 self.check_registered();
174
175 self.add_pool_flash_subscription(topic, handler);
176
177 let command = DefiSubscribeCommand::PoolFlashEvents(SubscribePoolFlashEvents {
178 instrument_id,
179 client_id,
180 command_id: UUID4::new(),
181 ts_init: self.timestamp_ns(),
182 params,
183 });
184
185 self.send_data_cmd(DataCommand::DefiSubscribe(command));
186 }
187
188 pub fn unsubscribe_blocks(
190 &mut self,
191 chain: Blockchain,
192 client_id: Option<ClientId>,
193 params: Option<Params>,
194 ) {
195 self.check_registered();
196
197 let topic = get_defi_blocks_topic(chain);
198 self.remove_block_subscription(topic);
199
200 let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
201 chain,
202 client_id,
203 command_id: UUID4::new(),
204 ts_init: self.timestamp_ns(),
205 params,
206 });
207
208 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
209 }
210
211 pub fn unsubscribe_pool(
213 &mut self,
214 instrument_id: InstrumentId,
215 client_id: Option<ClientId>,
216 params: Option<Params>,
217 ) {
218 self.check_registered();
219
220 let topic = get_defi_pool_topic(instrument_id);
221 self.remove_pool_subscription(topic);
222
223 let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
224 instrument_id,
225 client_id,
226 command_id: UUID4::new(),
227 ts_init: self.timestamp_ns(),
228 params,
229 });
230
231 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
232 }
233
234 pub fn unsubscribe_pool_swaps(
236 &mut self,
237 instrument_id: InstrumentId,
238 client_id: Option<ClientId>,
239 params: Option<Params>,
240 ) {
241 self.check_registered();
242
243 let topic = get_defi_pool_swaps_topic(instrument_id);
244 self.remove_pool_swap_subscription(topic);
245
246 let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
247 instrument_id,
248 client_id,
249 command_id: UUID4::new(),
250 ts_init: self.timestamp_ns(),
251 params,
252 });
253
254 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
255 }
256
257 pub fn unsubscribe_pool_liquidity_updates(
259 &mut self,
260 instrument_id: InstrumentId,
261 client_id: Option<ClientId>,
262 params: Option<Params>,
263 ) {
264 self.check_registered();
265
266 let topic = get_defi_liquidity_topic(instrument_id);
267 self.remove_pool_liquidity_subscription(topic);
268
269 let command =
270 DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
271 instrument_id,
272 client_id,
273 command_id: UUID4::new(),
274 ts_init: self.timestamp_ns(),
275 params,
276 });
277
278 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
279 }
280
281 pub fn unsubscribe_pool_fee_collects(
283 &mut self,
284 instrument_id: InstrumentId,
285 client_id: Option<ClientId>,
286 params: Option<Params>,
287 ) {
288 self.check_registered();
289
290 let topic = get_defi_collect_topic(instrument_id);
291 self.remove_pool_collect_subscription(topic);
292
293 let command = DefiUnsubscribeCommand::PoolFeeCollects(UnsubscribePoolFeeCollects {
294 instrument_id,
295 client_id,
296 command_id: UUID4::new(),
297 ts_init: self.timestamp_ns(),
298 params,
299 });
300
301 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
302 }
303
304 pub fn unsubscribe_pool_flash_events(
306 &mut self,
307 instrument_id: InstrumentId,
308 client_id: Option<ClientId>,
309 params: Option<Params>,
310 ) {
311 self.check_registered();
312
313 let topic = get_defi_flash_topic(instrument_id);
314 self.remove_pool_flash_subscription(topic);
315
316 let command = DefiUnsubscribeCommand::PoolFlashEvents(UnsubscribePoolFlashEvents {
317 instrument_id,
318 client_id,
319 command_id: UUID4::new(),
320 ts_init: self.timestamp_ns(),
321 params,
322 });
323
324 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
325 }
326}