Skip to main content

nautilus_common/defi/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific actor functionality.
17//!
18//! This module provides DeFi subscription and unsubscription helper methods
19//! for the `DataActorCore`. All code in this module requires the `defi` feature flag.
20
21use nautilus_core::{Params, UUID4};
22use nautilus_model::{
23    defi::{Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap},
24    identifiers::{ClientId, InstrumentId},
25};
26
27use crate::{
28    actor::DataActorCore,
29    defi::{
30        DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
31        SubscribePoolFeeCollects, SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates,
32        SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool, UnsubscribePoolFeeCollects,
33        UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
34        switchboard::{
35            get_defi_blocks_topic, get_defi_collect_topic, get_defi_flash_topic,
36            get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
37        },
38    },
39    messages::data::DataCommand,
40    msgbus::{MStr, Topic, TypedHandler},
41};
42
43impl DataActorCore {
44    /// Helper method for registering block subscriptions from the trait.
45    pub fn subscribe_blocks(
46        &mut self,
47        topic: MStr<Topic>,
48        handler: TypedHandler<Block>,
49        chain: Blockchain,
50        client_id: Option<ClientId>,
51        params: Option<Params>,
52    ) {
53        self.check_registered();
54
55        self.add_block_subscription(topic, handler);
56
57        let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
58            chain,
59            client_id,
60            command_id: UUID4::new(),
61            ts_init: self.timestamp_ns(),
62            params,
63        });
64
65        self.send_data_cmd(DataCommand::DefiSubscribe(command));
66    }
67
68    /// Helper method for registering pool subscriptions from the trait.
69    pub fn subscribe_pool(
70        &mut self,
71        topic: MStr<Topic>,
72        handler: TypedHandler<Pool>,
73        instrument_id: InstrumentId,
74        client_id: Option<ClientId>,
75        params: Option<Params>,
76    ) {
77        self.check_registered();
78
79        self.add_pool_subscription(topic, handler);
80
81        let command = DefiSubscribeCommand::Pool(SubscribePool {
82            instrument_id,
83            client_id,
84            command_id: UUID4::new(),
85            ts_init: self.timestamp_ns(),
86            params,
87        });
88
89        self.send_data_cmd(DataCommand::DefiSubscribe(command));
90    }
91
92    /// Helper method for registering pool swap subscriptions from the trait.
93    pub fn subscribe_pool_swaps(
94        &mut self,
95        topic: MStr<Topic>,
96        handler: TypedHandler<PoolSwap>,
97        instrument_id: InstrumentId,
98        client_id: Option<ClientId>,
99        params: Option<Params>,
100    ) {
101        self.check_registered();
102
103        self.add_pool_swap_subscription(topic, handler);
104
105        let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
106            instrument_id,
107            client_id,
108            command_id: UUID4::new(),
109            ts_init: self.timestamp_ns(),
110            params,
111        });
112
113        self.send_data_cmd(DataCommand::DefiSubscribe(command));
114    }
115
116    /// Helper method for registering pool liquidity update subscriptions from the trait.
117    pub fn subscribe_pool_liquidity_updates(
118        &mut self,
119        topic: MStr<Topic>,
120        handler: TypedHandler<PoolLiquidityUpdate>,
121        instrument_id: InstrumentId,
122        client_id: Option<ClientId>,
123        params: Option<Params>,
124    ) {
125        self.check_registered();
126
127        self.add_pool_liquidity_subscription(topic, handler);
128
129        let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
130            instrument_id,
131            client_id,
132            command_id: UUID4::new(),
133            ts_init: self.timestamp_ns(),
134            params,
135        });
136
137        self.send_data_cmd(DataCommand::DefiSubscribe(command));
138    }
139
140    /// Helper method for registering pool fee collect subscriptions from the trait.
141    pub fn subscribe_pool_fee_collects(
142        &mut self,
143        topic: MStr<Topic>,
144        handler: TypedHandler<PoolFeeCollect>,
145        instrument_id: InstrumentId,
146        client_id: Option<ClientId>,
147        params: Option<Params>,
148    ) {
149        self.check_registered();
150
151        self.add_pool_collect_subscription(topic, handler);
152
153        let command = DefiSubscribeCommand::PoolFeeCollects(SubscribePoolFeeCollects {
154            instrument_id,
155            client_id,
156            command_id: UUID4::new(),
157            ts_init: self.timestamp_ns(),
158            params,
159        });
160
161        self.send_data_cmd(DataCommand::DefiSubscribe(command));
162    }
163
164    /// Helper method for registering pool flash event subscriptions from the trait.
165    pub fn subscribe_pool_flash_events(
166        &mut self,
167        topic: MStr<Topic>,
168        handler: TypedHandler<PoolFlash>,
169        instrument_id: InstrumentId,
170        client_id: Option<ClientId>,
171        params: Option<Params>,
172    ) {
173        self.check_registered();
174
175        self.add_pool_flash_subscription(topic, handler);
176
177        let command = DefiSubscribeCommand::PoolFlashEvents(SubscribePoolFlashEvents {
178            instrument_id,
179            client_id,
180            command_id: UUID4::new(),
181            ts_init: self.timestamp_ns(),
182            params,
183        });
184
185        self.send_data_cmd(DataCommand::DefiSubscribe(command));
186    }
187
188    /// Helper method for unsubscribing from blocks.
189    pub fn unsubscribe_blocks(
190        &mut self,
191        chain: Blockchain,
192        client_id: Option<ClientId>,
193        params: Option<Params>,
194    ) {
195        self.check_registered();
196
197        let topic = get_defi_blocks_topic(chain);
198        self.remove_block_subscription(topic);
199
200        let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
201            chain,
202            client_id,
203            command_id: UUID4::new(),
204            ts_init: self.timestamp_ns(),
205            params,
206        });
207
208        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
209    }
210
211    /// Helper method for unsubscribing from pool definition updates.
212    pub fn unsubscribe_pool(
213        &mut self,
214        instrument_id: InstrumentId,
215        client_id: Option<ClientId>,
216        params: Option<Params>,
217    ) {
218        self.check_registered();
219
220        let topic = get_defi_pool_topic(instrument_id);
221        self.remove_pool_subscription(topic);
222
223        let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
224            instrument_id,
225            client_id,
226            command_id: UUID4::new(),
227            ts_init: self.timestamp_ns(),
228            params,
229        });
230
231        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
232    }
233
234    /// Helper method for unsubscribing from pool swaps.
235    pub fn unsubscribe_pool_swaps(
236        &mut self,
237        instrument_id: InstrumentId,
238        client_id: Option<ClientId>,
239        params: Option<Params>,
240    ) {
241        self.check_registered();
242
243        let topic = get_defi_pool_swaps_topic(instrument_id);
244        self.remove_pool_swap_subscription(topic);
245
246        let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
247            instrument_id,
248            client_id,
249            command_id: UUID4::new(),
250            ts_init: self.timestamp_ns(),
251            params,
252        });
253
254        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
255    }
256
257    /// Helper method for unsubscribing from pool liquidity updates.
258    pub fn unsubscribe_pool_liquidity_updates(
259        &mut self,
260        instrument_id: InstrumentId,
261        client_id: Option<ClientId>,
262        params: Option<Params>,
263    ) {
264        self.check_registered();
265
266        let topic = get_defi_liquidity_topic(instrument_id);
267        self.remove_pool_liquidity_subscription(topic);
268
269        let command =
270            DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
271                instrument_id,
272                client_id,
273                command_id: UUID4::new(),
274                ts_init: self.timestamp_ns(),
275                params,
276            });
277
278        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
279    }
280
281    /// Helper method for unsubscribing from pool fee collects.
282    pub fn unsubscribe_pool_fee_collects(
283        &mut self,
284        instrument_id: InstrumentId,
285        client_id: Option<ClientId>,
286        params: Option<Params>,
287    ) {
288        self.check_registered();
289
290        let topic = get_defi_collect_topic(instrument_id);
291        self.remove_pool_collect_subscription(topic);
292
293        let command = DefiUnsubscribeCommand::PoolFeeCollects(UnsubscribePoolFeeCollects {
294            instrument_id,
295            client_id,
296            command_id: UUID4::new(),
297            ts_init: self.timestamp_ns(),
298            params,
299        });
300
301        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
302    }
303
304    /// Helper method for unsubscribing from pool flash events.
305    pub fn unsubscribe_pool_flash_events(
306        &mut self,
307        instrument_id: InstrumentId,
308        client_id: Option<ClientId>,
309        params: Option<Params>,
310    ) {
311        self.check_registered();
312
313        let topic = get_defi_flash_topic(instrument_id);
314        self.remove_pool_flash_subscription(topic);
315
316        let command = DefiUnsubscribeCommand::PoolFlashEvents(UnsubscribePoolFlashEvents {
317            instrument_id,
318            client_id,
319            command_id: UUID4::new(),
320            ts_init: self.timestamp_ns(),
321            params,
322        });
323
324        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
325    }
326}