Skip to main content

nautilus_data/engine/
pool.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler that maintains the `Pool` state stored in the global [`Cache`].
17//!
18//! The handler is functionally equivalent to `BookUpdater` but for DeFi liquidity
19//! pools. Whenever a [`PoolSwap`] or [`PoolLiquidityUpdate`] is published on the
20//! message bus the handler looks up the corresponding `Pool` instance in the
21//! cache and applies the change in-place (for now we only update the `ts_init`
22//! timestamp so that consumers can tell the pool has been touched).
23
24use std::{cell::RefCell, rc::Rc};
25
26use nautilus_common::{cache::Cache, msgbus::Handler};
27use nautilus_model::{
28    defi::{PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap},
29    identifiers::InstrumentId,
30};
31use ustr::Ustr;
32
33/// Handles [`PoolSwap`]s and [`PoolLiquidityUpdate`]s for a single AMM pool.
34#[derive(Debug)]
35pub struct PoolUpdater {
36    id: Ustr,
37    instrument_id: InstrumentId,
38    cache: Rc<RefCell<Cache>>,
39}
40
41impl PoolUpdater {
42    /// Creates a new [`PoolUpdater`] bound to the given `instrument_id` and `cache`.
43    #[must_use]
44    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
45        Self {
46            id: Ustr::from(&format!("{}-{}", stringify!(PoolUpdater), instrument_id)),
47            instrument_id: *instrument_id,
48            cache,
49        }
50    }
51
52    /// Returns the handler ID.
53    #[must_use]
54    pub fn id(&self) -> Ustr {
55        self.id
56    }
57
58    /// Handles a pool swap event.
59    pub fn handle_pool_swap(&self, swap: &PoolSwap) {
60        if let Some(pool_profiler) = self
61            .cache
62            .borrow_mut()
63            .pool_profiler_mut(&self.instrument_id)
64            && let Err(e) = pool_profiler.process_swap(swap)
65        {
66            log::error!("Failed to process pool swap: {e}");
67        }
68    }
69
70    /// Handles a pool liquidity update event.
71    ///
72    /// # Panics
73    ///
74    /// Panics if `update.kind` is not `Mint` or `Burn`.
75    pub fn handle_pool_liquidity_update(&self, update: &PoolLiquidityUpdate) {
76        if let Some(pool_profiler) = self
77            .cache
78            .borrow_mut()
79            .pool_profiler_mut(&self.instrument_id)
80            && let Err(e) = match update.kind {
81                PoolLiquidityUpdateType::Mint => pool_profiler.process_mint(update),
82                PoolLiquidityUpdateType::Burn => pool_profiler.process_burn(update),
83                _ => panic!("Liquidity update operation {} not implemented", update.kind),
84            }
85        {
86            log::error!("Failed to process pool liquidity update: {e}");
87        }
88    }
89
90    /// Handles a pool fee collect event.
91    pub fn handle_pool_fee_collect(&self, event: &PoolFeeCollect) {
92        if let Some(pool_profiler) = self
93            .cache
94            .borrow_mut()
95            .pool_profiler_mut(&self.instrument_id)
96            && let Err(e) = pool_profiler.process_collect(event)
97        {
98            log::error!("Failed to process pool fee collect: {e}");
99        }
100    }
101
102    /// Handles a pool flash event.
103    pub fn handle_pool_flash(&self, event: &PoolFlash) {
104        if let Some(pool_profiler) = self
105            .cache
106            .borrow_mut()
107            .pool_profiler_mut(&self.instrument_id)
108            && let Err(e) = pool_profiler.process_flash(event)
109        {
110            log::error!("Failed to process pool flash: {e}");
111        }
112    }
113}
114
115/// Handler for pool swap events that delegates to a [`PoolUpdater`].
116#[derive(Debug)]
117pub struct PoolSwapHandler {
118    id: Ustr,
119    updater: Rc<PoolUpdater>,
120}
121
122impl PoolSwapHandler {
123    /// Creates a new swap handler delegating to the given updater.
124    #[must_use]
125    pub fn new(updater: Rc<PoolUpdater>) -> Self {
126        Self {
127            id: Ustr::from(&format!("PoolSwapHandler-{}", updater.id())),
128            updater,
129        }
130    }
131}
132
133impl Handler<PoolSwap> for PoolSwapHandler {
134    fn id(&self) -> Ustr {
135        self.id
136    }
137
138    fn handle(&self, msg: &PoolSwap) {
139        self.updater.handle_pool_swap(msg);
140    }
141}
142
143/// Handler for pool liquidity update events that delegates to a [`PoolUpdater`].
144#[derive(Debug)]
145pub struct PoolLiquidityHandler {
146    id: Ustr,
147    updater: Rc<PoolUpdater>,
148}
149
150impl PoolLiquidityHandler {
151    /// Creates a new liquidity handler delegating to the given updater.
152    #[must_use]
153    pub fn new(updater: Rc<PoolUpdater>) -> Self {
154        Self {
155            id: Ustr::from(&format!("PoolLiquidityHandler-{}", updater.id())),
156            updater,
157        }
158    }
159}
160
161impl Handler<PoolLiquidityUpdate> for PoolLiquidityHandler {
162    fn id(&self) -> Ustr {
163        self.id
164    }
165
166    fn handle(&self, msg: &PoolLiquidityUpdate) {
167        self.updater.handle_pool_liquidity_update(msg);
168    }
169}
170
171/// Handler for pool fee collect events that delegates to a [`PoolUpdater`].
172#[derive(Debug)]
173pub struct PoolCollectHandler {
174    id: Ustr,
175    updater: Rc<PoolUpdater>,
176}
177
178impl PoolCollectHandler {
179    /// Creates a new collect handler delegating to the given updater.
180    #[must_use]
181    pub fn new(updater: Rc<PoolUpdater>) -> Self {
182        Self {
183            id: Ustr::from(&format!("PoolCollectHandler-{}", updater.id())),
184            updater,
185        }
186    }
187}
188
189impl Handler<PoolFeeCollect> for PoolCollectHandler {
190    fn id(&self) -> Ustr {
191        self.id
192    }
193
194    fn handle(&self, msg: &PoolFeeCollect) {
195        self.updater.handle_pool_fee_collect(msg);
196    }
197}
198
199/// Handler for pool flash events that delegates to a [`PoolUpdater`].
200#[derive(Debug)]
201pub struct PoolFlashHandler {
202    id: Ustr,
203    updater: Rc<PoolUpdater>,
204}
205
206impl PoolFlashHandler {
207    /// Creates a new flash handler delegating to the given updater.
208    #[must_use]
209    pub fn new(updater: Rc<PoolUpdater>) -> Self {
210        Self {
211            id: Ustr::from(&format!("PoolFlashHandler-{}", updater.id())),
212            updater,
213        }
214    }
215}
216
217impl Handler<PoolFlash> for PoolFlashHandler {
218    fn id(&self) -> Ustr {
219        self.id
220    }
221
222    fn handle(&self, msg: &PoolFlash) {
223        self.updater.handle_pool_flash(msg);
224    }
225}