nautilus_data/engine/
pool.rs1use std::{cell::RefCell, rc::Rc};
25
26use nautilus_common::{cache::Cache, msgbus::Handler};
27use nautilus_model::{
28 defi::{PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap},
29 identifiers::InstrumentId,
30};
31use ustr::Ustr;
32
33#[derive(Debug)]
35pub struct PoolUpdater {
36 id: Ustr,
37 instrument_id: InstrumentId,
38 cache: Rc<RefCell<Cache>>,
39}
40
41impl PoolUpdater {
42 #[must_use]
44 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
45 Self {
46 id: Ustr::from(&format!("{}-{}", stringify!(PoolUpdater), instrument_id)),
47 instrument_id: *instrument_id,
48 cache,
49 }
50 }
51
52 #[must_use]
54 pub fn id(&self) -> Ustr {
55 self.id
56 }
57
58 pub fn handle_pool_swap(&self, swap: &PoolSwap) {
60 if let Some(pool_profiler) = self
61 .cache
62 .borrow_mut()
63 .pool_profiler_mut(&self.instrument_id)
64 && let Err(e) = pool_profiler.process_swap(swap)
65 {
66 log::error!("Failed to process pool swap: {e}");
67 }
68 }
69
70 pub fn handle_pool_liquidity_update(&self, update: &PoolLiquidityUpdate) {
76 if let Some(pool_profiler) = self
77 .cache
78 .borrow_mut()
79 .pool_profiler_mut(&self.instrument_id)
80 && let Err(e) = match update.kind {
81 PoolLiquidityUpdateType::Mint => pool_profiler.process_mint(update),
82 PoolLiquidityUpdateType::Burn => pool_profiler.process_burn(update),
83 _ => panic!("Liquidity update operation {} not implemented", update.kind),
84 }
85 {
86 log::error!("Failed to process pool liquidity update: {e}");
87 }
88 }
89
90 pub fn handle_pool_fee_collect(&self, event: &PoolFeeCollect) {
92 if let Some(pool_profiler) = self
93 .cache
94 .borrow_mut()
95 .pool_profiler_mut(&self.instrument_id)
96 && let Err(e) = pool_profiler.process_collect(event)
97 {
98 log::error!("Failed to process pool fee collect: {e}");
99 }
100 }
101
102 pub fn handle_pool_flash(&self, event: &PoolFlash) {
104 if let Some(pool_profiler) = self
105 .cache
106 .borrow_mut()
107 .pool_profiler_mut(&self.instrument_id)
108 && let Err(e) = pool_profiler.process_flash(event)
109 {
110 log::error!("Failed to process pool flash: {e}");
111 }
112 }
113}
114
115#[derive(Debug)]
117pub struct PoolSwapHandler {
118 id: Ustr,
119 updater: Rc<PoolUpdater>,
120}
121
122impl PoolSwapHandler {
123 #[must_use]
125 pub fn new(updater: Rc<PoolUpdater>) -> Self {
126 Self {
127 id: Ustr::from(&format!("PoolSwapHandler-{}", updater.id())),
128 updater,
129 }
130 }
131}
132
133impl Handler<PoolSwap> for PoolSwapHandler {
134 fn id(&self) -> Ustr {
135 self.id
136 }
137
138 fn handle(&self, msg: &PoolSwap) {
139 self.updater.handle_pool_swap(msg);
140 }
141}
142
143#[derive(Debug)]
145pub struct PoolLiquidityHandler {
146 id: Ustr,
147 updater: Rc<PoolUpdater>,
148}
149
150impl PoolLiquidityHandler {
151 #[must_use]
153 pub fn new(updater: Rc<PoolUpdater>) -> Self {
154 Self {
155 id: Ustr::from(&format!("PoolLiquidityHandler-{}", updater.id())),
156 updater,
157 }
158 }
159}
160
161impl Handler<PoolLiquidityUpdate> for PoolLiquidityHandler {
162 fn id(&self) -> Ustr {
163 self.id
164 }
165
166 fn handle(&self, msg: &PoolLiquidityUpdate) {
167 self.updater.handle_pool_liquidity_update(msg);
168 }
169}
170
171#[derive(Debug)]
173pub struct PoolCollectHandler {
174 id: Ustr,
175 updater: Rc<PoolUpdater>,
176}
177
178impl PoolCollectHandler {
179 #[must_use]
181 pub fn new(updater: Rc<PoolUpdater>) -> Self {
182 Self {
183 id: Ustr::from(&format!("PoolCollectHandler-{}", updater.id())),
184 updater,
185 }
186 }
187}
188
189impl Handler<PoolFeeCollect> for PoolCollectHandler {
190 fn id(&self) -> Ustr {
191 self.id
192 }
193
194 fn handle(&self, msg: &PoolFeeCollect) {
195 self.updater.handle_pool_fee_collect(msg);
196 }
197}
198
199#[derive(Debug)]
201pub struct PoolFlashHandler {
202 id: Ustr,
203 updater: Rc<PoolUpdater>,
204}
205
206impl PoolFlashHandler {
207 #[must_use]
209 pub fn new(updater: Rc<PoolUpdater>) -> Self {
210 Self {
211 id: Ustr::from(&format!("PoolFlashHandler-{}", updater.id())),
212 updater,
213 }
214 }
215}
216
217impl Handler<PoolFlash> for PoolFlashHandler {
218 fn id(&self) -> Ustr {
219 self.id
220 }
221
222 fn handle(&self, msg: &PoolFlash) {
223 self.updater.handle_pool_flash(msg);
224 }
225}