nautilus_data/engine/
pool.rs1use std::{cell::RefCell, rc::Rc};
25
26use nautilus_common::{cache::Cache, msgbus::Handler};
27use nautilus_model::{
28 defi::{
29 PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolProfiler,
30 PoolSwap,
31 },
32 identifiers::InstrumentId,
33};
34use ustr::Ustr;
35
36#[derive(Debug)]
38pub struct PoolUpdater {
39 id: Ustr,
40 instrument_id: InstrumentId,
41 cache: Rc<RefCell<Cache>>,
42}
43
44impl PoolUpdater {
45 #[must_use]
47 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
48 Self {
49 id: Ustr::from(&format!("{}-{}", stringify!(PoolUpdater), instrument_id)),
50 instrument_id: *instrument_id,
51 cache,
52 }
53 }
54
55 #[must_use]
57 pub fn id(&self) -> Ustr {
58 self.id
59 }
60
61 pub fn handle_pool_swap(&self, swap: &PoolSwap) {
63 if let Some(pool_profiler) = self
64 .cache
65 .borrow_mut()
66 .pool_profiler_mut(&self.instrument_id)
67 && let Err(e) = pool_profiler.process_swap(swap)
68 {
69 log::error!("Failed to process pool swap: {e}");
70 }
71 }
72
73 pub fn handle_pool_liquidity_update(&self, update: &PoolLiquidityUpdate) {
75 if let Some(pool_profiler) = self
76 .cache
77 .borrow_mut()
78 .pool_profiler_mut(&self.instrument_id)
79 && let Err(e) = process_pool_liquidity_update(pool_profiler, update)
80 {
81 log::error!("Failed to process pool liquidity update: {e}");
82 }
83 }
84
85 pub fn handle_pool_fee_collect(&self, event: &PoolFeeCollect) {
87 if let Some(pool_profiler) = self
88 .cache
89 .borrow_mut()
90 .pool_profiler_mut(&self.instrument_id)
91 && let Err(e) = pool_profiler.process_collect(event)
92 {
93 log::error!("Failed to process pool fee collect: {e}");
94 }
95 }
96
97 pub fn handle_pool_flash(&self, event: &PoolFlash) {
99 if let Some(pool_profiler) = self
100 .cache
101 .borrow_mut()
102 .pool_profiler_mut(&self.instrument_id)
103 && let Err(e) = pool_profiler.process_flash(event)
104 {
105 log::error!("Failed to process pool flash: {e}");
106 }
107 }
108}
109
110fn process_pool_liquidity_update(
111 pool_profiler: &mut PoolProfiler,
112 update: &PoolLiquidityUpdate,
113) -> anyhow::Result<()> {
114 match update.kind {
115 PoolLiquidityUpdateType::Mint => pool_profiler.process_mint(update),
116 PoolLiquidityUpdateType::Burn => pool_profiler.process_burn(update),
117 _ => anyhow::bail!("Unsupported liquidity update operation {}", update.kind),
118 }
119}
120
121#[derive(Debug)]
123pub struct PoolSwapHandler {
124 id: Ustr,
125 updater: Rc<PoolUpdater>,
126}
127
128impl PoolSwapHandler {
129 #[must_use]
131 pub fn new(updater: Rc<PoolUpdater>) -> Self {
132 Self {
133 id: Ustr::from(&format!("PoolSwapHandler-{}", updater.id())),
134 updater,
135 }
136 }
137}
138
139impl Handler<PoolSwap> for PoolSwapHandler {
140 fn id(&self) -> Ustr {
141 self.id
142 }
143
144 fn handle(&self, msg: &PoolSwap) {
145 self.updater.handle_pool_swap(msg);
146 }
147}
148
149#[derive(Debug)]
151pub struct PoolLiquidityHandler {
152 id: Ustr,
153 updater: Rc<PoolUpdater>,
154}
155
156impl PoolLiquidityHandler {
157 #[must_use]
159 pub fn new(updater: Rc<PoolUpdater>) -> Self {
160 Self {
161 id: Ustr::from(&format!("PoolLiquidityHandler-{}", updater.id())),
162 updater,
163 }
164 }
165}
166
167impl Handler<PoolLiquidityUpdate> for PoolLiquidityHandler {
168 fn id(&self) -> Ustr {
169 self.id
170 }
171
172 fn handle(&self, msg: &PoolLiquidityUpdate) {
173 self.updater.handle_pool_liquidity_update(msg);
174 }
175}
176
177#[derive(Debug)]
179pub struct PoolCollectHandler {
180 id: Ustr,
181 updater: Rc<PoolUpdater>,
182}
183
184impl PoolCollectHandler {
185 #[must_use]
187 pub fn new(updater: Rc<PoolUpdater>) -> Self {
188 Self {
189 id: Ustr::from(&format!("PoolCollectHandler-{}", updater.id())),
190 updater,
191 }
192 }
193}
194
195impl Handler<PoolFeeCollect> for PoolCollectHandler {
196 fn id(&self) -> Ustr {
197 self.id
198 }
199
200 fn handle(&self, msg: &PoolFeeCollect) {
201 self.updater.handle_pool_fee_collect(msg);
202 }
203}
204
205#[derive(Debug)]
207pub struct PoolFlashHandler {
208 id: Ustr,
209 updater: Rc<PoolUpdater>,
210}
211
212impl PoolFlashHandler {
213 #[must_use]
215 pub fn new(updater: Rc<PoolUpdater>) -> Self {
216 Self {
217 id: Ustr::from(&format!("PoolFlashHandler-{}", updater.id())),
218 updater,
219 }
220 }
221}
222
223impl Handler<PoolFlash> for PoolFlashHandler {
224 fn id(&self) -> Ustr {
225 self.id
226 }
227
228 fn handle(&self, msg: &PoolFlash) {
229 self.updater.handle_pool_flash(msg);
230 }
231}