Skip to main content

nautilus_data/engine/
pool.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler that maintains the `Pool` state stored in the global [`Cache`].
17//!
18//! The handler is functionally equivalent to `BookUpdater` but for DeFi liquidity
19//! pools. Whenever a [`PoolSwap`] or [`PoolLiquidityUpdate`] is published on the
20//! message bus the handler looks up the corresponding `Pool` instance in the
21//! cache and applies the change in-place (for now we only update the `ts_init`
22//! timestamp so that consumers can tell the pool has been touched).
23
24use std::{cell::RefCell, rc::Rc};
25
26use nautilus_common::{cache::Cache, msgbus::Handler};
27use nautilus_model::{
28    defi::{
29        PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolProfiler,
30        PoolSwap,
31    },
32    identifiers::InstrumentId,
33};
34use ustr::Ustr;
35
36/// Handles [`PoolSwap`]s and [`PoolLiquidityUpdate`]s for a single AMM pool.
37#[derive(Debug)]
38pub struct PoolUpdater {
39    id: Ustr,
40    instrument_id: InstrumentId,
41    cache: Rc<RefCell<Cache>>,
42}
43
44impl PoolUpdater {
45    /// Creates a new [`PoolUpdater`] bound to the given `instrument_id` and `cache`.
46    #[must_use]
47    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
48        Self {
49            id: Ustr::from(&format!("{}-{}", stringify!(PoolUpdater), instrument_id)),
50            instrument_id: *instrument_id,
51            cache,
52        }
53    }
54
55    /// Returns the handler ID.
56    #[must_use]
57    pub fn id(&self) -> Ustr {
58        self.id
59    }
60
61    /// Handles a pool swap event.
62    pub fn handle_pool_swap(&self, swap: &PoolSwap) {
63        if let Some(pool_profiler) = self
64            .cache
65            .borrow_mut()
66            .pool_profiler_mut(&self.instrument_id)
67            && let Err(e) = pool_profiler.process_swap(swap)
68        {
69            log::error!("Failed to process pool swap: {e}");
70        }
71    }
72
73    /// Handles a pool liquidity update event.
74    pub fn handle_pool_liquidity_update(&self, update: &PoolLiquidityUpdate) {
75        if let Some(pool_profiler) = self
76            .cache
77            .borrow_mut()
78            .pool_profiler_mut(&self.instrument_id)
79            && let Err(e) = process_pool_liquidity_update(pool_profiler, update)
80        {
81            log::error!("Failed to process pool liquidity update: {e}");
82        }
83    }
84
85    /// Handles a pool fee collect event.
86    pub fn handle_pool_fee_collect(&self, event: &PoolFeeCollect) {
87        if let Some(pool_profiler) = self
88            .cache
89            .borrow_mut()
90            .pool_profiler_mut(&self.instrument_id)
91            && let Err(e) = pool_profiler.process_collect(event)
92        {
93            log::error!("Failed to process pool fee collect: {e}");
94        }
95    }
96
97    /// Handles a pool flash event.
98    pub fn handle_pool_flash(&self, event: &PoolFlash) {
99        if let Some(pool_profiler) = self
100            .cache
101            .borrow_mut()
102            .pool_profiler_mut(&self.instrument_id)
103            && let Err(e) = pool_profiler.process_flash(event)
104        {
105            log::error!("Failed to process pool flash: {e}");
106        }
107    }
108}
109
110fn process_pool_liquidity_update(
111    pool_profiler: &mut PoolProfiler,
112    update: &PoolLiquidityUpdate,
113) -> anyhow::Result<()> {
114    match update.kind {
115        PoolLiquidityUpdateType::Mint => pool_profiler.process_mint(update),
116        PoolLiquidityUpdateType::Burn => pool_profiler.process_burn(update),
117        _ => anyhow::bail!("Unsupported liquidity update operation {}", update.kind),
118    }
119}
120
121/// Handler for pool swap events that delegates to a [`PoolUpdater`].
122#[derive(Debug)]
123pub struct PoolSwapHandler {
124    id: Ustr,
125    updater: Rc<PoolUpdater>,
126}
127
128impl PoolSwapHandler {
129    /// Creates a new swap handler delegating to the given updater.
130    #[must_use]
131    pub fn new(updater: Rc<PoolUpdater>) -> Self {
132        Self {
133            id: Ustr::from(&format!("PoolSwapHandler-{}", updater.id())),
134            updater,
135        }
136    }
137}
138
139impl Handler<PoolSwap> for PoolSwapHandler {
140    fn id(&self) -> Ustr {
141        self.id
142    }
143
144    fn handle(&self, msg: &PoolSwap) {
145        self.updater.handle_pool_swap(msg);
146    }
147}
148
149/// Handler for pool liquidity update events that delegates to a [`PoolUpdater`].
150#[derive(Debug)]
151pub struct PoolLiquidityHandler {
152    id: Ustr,
153    updater: Rc<PoolUpdater>,
154}
155
156impl PoolLiquidityHandler {
157    /// Creates a new liquidity handler delegating to the given updater.
158    #[must_use]
159    pub fn new(updater: Rc<PoolUpdater>) -> Self {
160        Self {
161            id: Ustr::from(&format!("PoolLiquidityHandler-{}", updater.id())),
162            updater,
163        }
164    }
165}
166
167impl Handler<PoolLiquidityUpdate> for PoolLiquidityHandler {
168    fn id(&self) -> Ustr {
169        self.id
170    }
171
172    fn handle(&self, msg: &PoolLiquidityUpdate) {
173        self.updater.handle_pool_liquidity_update(msg);
174    }
175}
176
177/// Handler for pool fee collect events that delegates to a [`PoolUpdater`].
178#[derive(Debug)]
179pub struct PoolCollectHandler {
180    id: Ustr,
181    updater: Rc<PoolUpdater>,
182}
183
184impl PoolCollectHandler {
185    /// Creates a new collect handler delegating to the given updater.
186    #[must_use]
187    pub fn new(updater: Rc<PoolUpdater>) -> Self {
188        Self {
189            id: Ustr::from(&format!("PoolCollectHandler-{}", updater.id())),
190            updater,
191        }
192    }
193}
194
195impl Handler<PoolFeeCollect> for PoolCollectHandler {
196    fn id(&self) -> Ustr {
197        self.id
198    }
199
200    fn handle(&self, msg: &PoolFeeCollect) {
201        self.updater.handle_pool_fee_collect(msg);
202    }
203}
204
205/// Handler for pool flash events that delegates to a [`PoolUpdater`].
206#[derive(Debug)]
207pub struct PoolFlashHandler {
208    id: Ustr,
209    updater: Rc<PoolUpdater>,
210}
211
212impl PoolFlashHandler {
213    /// Creates a new flash handler delegating to the given updater.
214    #[must_use]
215    pub fn new(updater: Rc<PoolUpdater>) -> Self {
216        Self {
217            id: Ustr::from(&format!("PoolFlashHandler-{}", updater.id())),
218            updater,
219        }
220    }
221}
222
223impl Handler<PoolFlash> for PoolFlashHandler {
224    fn id(&self) -> Ustr {
225        self.id
226    }
227
228    fn handle(&self, msg: &PoolFlash) {
229        self.updater.handle_pool_flash(msg);
230    }
231}