1use std::{rc::Rc, sync::Arc};
22
23use nautilus_common::{
24 defi,
25 messages::defi::{
26 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27 },
28 msgbus::{self, TypedHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32 defi::{
33 Blockchain, DefiData, PoolProfiler,
34 data::{DexPoolData, block::BlockPosition},
35 },
36 identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{
40 DataEngine,
41 pool::{
42 PoolCollectHandler, PoolFlashHandler, PoolLiquidityHandler, PoolSwapHandler, PoolUpdater,
43 },
44};
45
46fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
48 match event {
49 DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
50 DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
51 DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
52 DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
53 }
54}
55
56fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
58 let mut events: Vec<DexPoolData> = buffered_events
59 .into_iter()
60 .filter_map(|event| match event {
61 DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
62 DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
63 DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
64 DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
65 _ => None,
66 })
67 .collect();
68
69 events.sort_by(|a, b| {
70 let pos_a = get_event_block_position(a);
71 let pos_b = get_event_block_position(b);
72 pos_a.cmp(&pos_b)
73 });
74
75 events
76}
77
78impl DataEngine {
79 #[must_use]
81 pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
82 self.collect_subscriptions(|client| &client.subscriptions_blocks)
83 }
84
85 #[must_use]
87 pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
88 self.collect_subscriptions(|client| &client.subscriptions_pools)
89 }
90
91 #[must_use]
93 pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
94 self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
95 }
96
97 #[must_use]
99 pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
100 self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
101 }
102
103 #[must_use]
105 pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
106 self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
107 }
108
109 #[must_use]
111 pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
112 self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
113 }
114
115 pub fn execute_defi_subscribe(&mut self, cmd: DefiSubscribeCommand) -> anyhow::Result<()> {
122 if let Some(client_id) = cmd.client_id()
123 && self.external_clients.contains(client_id)
124 {
125 if self.config.debug {
126 log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}");
127 }
128 return Ok(());
129 }
130
131 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
132 log::info!("Forwarding subscription to client {}", client.client_id);
133 client.execute_defi_subscribe(cmd.clone());
134 } else {
135 log::error!(
136 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
137 cmd.client_id(),
138 cmd.venue(),
139 );
140 }
141
142 match cmd {
143 DefiSubscribeCommand::Pool(cmd) => {
144 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
145 }
146 DefiSubscribeCommand::PoolSwaps(cmd) => {
147 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
148 }
149 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
150 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
151 }
152 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
153 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
154 }
155 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
156 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
157 }
158 DefiSubscribeCommand::Blocks(_) => {} }
160
161 Ok(())
162 }
163
164 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
170 if let Some(client_id) = cmd.client_id()
171 && self.external_clients.contains(client_id)
172 {
173 if self.config.debug {
174 log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}");
175 }
176 return Ok(());
177 }
178
179 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
180 client.execute_defi_unsubscribe(cmd);
181 } else {
182 log::error!(
183 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
184 cmd.client_id(),
185 cmd.venue(),
186 );
187 }
188
189 Ok(())
190 }
191
192 pub fn execute_defi_request(&mut self, req: DefiRequestCommand) -> anyhow::Result<()> {
199 if let Some(cid) = req.client_id()
201 && self.external_clients.contains(cid)
202 {
203 if self.config.debug {
204 log::debug!("Skipping defi data request for external client {cid}: {req:?}");
205 }
206 return Ok(());
207 }
208
209 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
210 client.execute_defi_request(req)
211 } else {
212 anyhow::bail!(
213 "Cannot handle request: no client found for {:?} {:?}",
214 req.client_id(),
215 req.venue()
216 );
217 }
218 }
219
220 pub fn process_defi_data(&mut self, data: DefiData) {
222 self.increment_data_count();
223
224 match data {
225 DefiData::Block(block) => {
226 let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
227 msgbus::publish_defi_block(topic, &block);
228 }
229 DefiData::Pool(pool) => {
230 if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
231 log::error!("Failed to add Pool to cache: {e}");
232 }
233
234 if self.pool_updaters_pending.contains(&pool.instrument_id) {
239 if self.pool_snapshot_pending.contains(&pool.instrument_id) {
240 log::debug!(
241 "Pool {} loaded; deferring profiler creation to snapshot handler",
242 pool.instrument_id
243 );
244 } else {
245 self.pool_updaters_pending.remove(&pool.instrument_id);
246 log::info!(
247 "Pool {} now loaded, creating deferred pool profiler",
248 pool.instrument_id
249 );
250 self.setup_pool_updater(&pool.instrument_id, None);
251 }
252 }
253
254 let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
255 msgbus::publish_defi_pool(topic, &pool);
256 }
257 DefiData::PoolSnapshot(snapshot) => {
258 let instrument_id = snapshot.instrument_id;
259 log::info!(
260 "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
261 snapshot.block_position.number,
262 snapshot.positions.len(),
263 snapshot.ticks.len()
264 );
265
266 if !self.pool_snapshot_pending.contains(&instrument_id) {
268 log::warn!(
269 "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
270 );
271 return;
272 }
273
274 let pool = match self.cache.borrow().pool(&instrument_id) {
276 Some(pool) => Arc::new(pool.clone()),
277 None => {
278 log::error!(
279 "Pool {instrument_id} not found in cache when processing snapshot"
280 );
281 return;
282 }
283 };
284
285 if snapshot.positions.is_empty()
290 && snapshot.ticks.is_empty()
291 && snapshot.block_position.number == pool.creation_block
292 {
293 log::warn!(
294 "Refusing empty stub snapshot for {instrument_id} at pool creation block {}; pool will remain without profiler",
295 snapshot.block_position.number,
296 );
297 self.pool_snapshot_pending.remove(&instrument_id);
298 self.pool_updaters_pending.remove(&instrument_id);
299 self.pool_event_buffers.remove(&instrument_id);
300 return;
301 }
302
303 let mut profiler = PoolProfiler::new(pool);
305 if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
306 log::error!(
307 "Failed to restore profiler from snapshot for {instrument_id}: {e}"
308 );
309 return;
310 }
311 log::debug!("Restored pool profiler for {instrument_id} from snapshot");
312
313 let buffered_events = self
315 .pool_event_buffers
316 .remove(&instrument_id)
317 .unwrap_or_default();
318
319 if !buffered_events.is_empty() {
320 log::info!(
321 "Processing {} buffered events for {instrument_id}",
322 buffered_events.len()
323 );
324
325 let events_to_apply = convert_and_sort_buffered_events(buffered_events);
326 let applied_count = Self::apply_buffered_events_to_profiler(
327 &mut profiler,
328 events_to_apply,
329 &snapshot.block_position,
330 instrument_id,
331 );
332
333 log::info!(
334 "Applied {applied_count} buffered events to profiler for {instrument_id}"
335 );
336 }
337
338 if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
340 log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
341 return;
342 }
343
344 self.pool_snapshot_pending.remove(&instrument_id);
346 self.pool_updaters_pending.remove(&instrument_id);
347 let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
348
349 self.subscribe_pool_updater_topics(instrument_id, updater.clone());
350 self.pool_updaters.insert(instrument_id, updater);
351
352 log::info!(
353 "Pool profiler setup completed for {instrument_id}, now processing live events"
354 );
355 }
356 DefiData::PoolSwap(swap) => {
357 let instrument_id = swap.instrument_id;
358 if self.pool_snapshot_pending.contains(&instrument_id) {
360 log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
361 self.pool_event_buffers
362 .entry(instrument_id)
363 .or_default()
364 .push(DefiData::PoolSwap(swap));
365 } else {
366 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
367 msgbus::publish_defi_swap(topic, &swap);
368 }
369 }
370 DefiData::PoolLiquidityUpdate(update) => {
371 let instrument_id = update.instrument_id;
372 if self.pool_snapshot_pending.contains(&instrument_id) {
374 log::debug!(
375 "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
376 );
377 self.pool_event_buffers
378 .entry(instrument_id)
379 .or_default()
380 .push(DefiData::PoolLiquidityUpdate(update));
381 } else {
382 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
383 msgbus::publish_defi_liquidity(topic, &update);
384 }
385 }
386 DefiData::PoolFeeCollect(collect) => {
387 let instrument_id = collect.instrument_id;
388 if self.pool_snapshot_pending.contains(&instrument_id) {
390 log::debug!(
391 "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
392 );
393 self.pool_event_buffers
394 .entry(instrument_id)
395 .or_default()
396 .push(DefiData::PoolFeeCollect(collect));
397 } else {
398 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
399 msgbus::publish_defi_collect(topic, &collect);
400 }
401 }
402 DefiData::PoolFlash(flash) => {
403 let instrument_id = flash.instrument_id;
404 if self.pool_snapshot_pending.contains(&instrument_id) {
406 log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
407 self.pool_event_buffers
408 .entry(instrument_id)
409 .or_default()
410 .push(DefiData::PoolFlash(flash));
411 } else {
412 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
413 msgbus::publish_defi_flash(topic, &flash);
414 }
415 }
416 }
417 }
418
419 fn subscribe_pool_updater_topics(&self, instrument_id: InstrumentId, updater: Rc<PoolUpdater>) {
421 let priority = Some(self.msgbus_priority);
422
423 let swap_topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
425 let swap_handler = TypedHandler(Rc::new(PoolSwapHandler::new(updater.clone())));
426 msgbus::subscribe_defi_swaps(swap_topic.into(), swap_handler, priority);
427
428 let liq_topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
430 let liq_handler = TypedHandler(Rc::new(PoolLiquidityHandler::new(updater.clone())));
431 msgbus::subscribe_defi_liquidity(liq_topic.into(), liq_handler, priority);
432
433 let collect_topic = defi::switchboard::get_defi_collect_topic(instrument_id);
435 let collect_handler = TypedHandler(Rc::new(PoolCollectHandler::new(updater.clone())));
436 msgbus::subscribe_defi_collects(collect_topic.into(), collect_handler, priority);
437
438 let flash_topic = defi::switchboard::get_defi_flash_topic(instrument_id);
440 let flash_handler = TypedHandler(Rc::new(PoolFlashHandler::new(updater)));
441 msgbus::subscribe_defi_flash(flash_topic.into(), flash_handler, priority);
442 }
443
444 fn apply_buffered_events_to_profiler(
448 profiler: &mut PoolProfiler,
449 events: Vec<DexPoolData>,
450 snapshot_block: &BlockPosition,
451 instrument_id: InstrumentId,
452 ) -> usize {
453 let mut applied_count = 0;
454
455 for event in events {
456 let event_block = get_event_block_position(&event);
457
458 let is_after_snapshot = event_block.0 > snapshot_block.number
460 || (event_block.0 == snapshot_block.number
461 && event_block.1 > snapshot_block.transaction_index)
462 || (event_block.0 == snapshot_block.number
463 && event_block.1 == snapshot_block.transaction_index
464 && event_block.2 > snapshot_block.log_index);
465
466 if is_after_snapshot {
467 if let Err(e) = profiler.process(&event) {
468 log::error!(
469 "Failed to apply buffered event to profiler for {instrument_id}: {e}"
470 );
471 } else {
472 applied_count += 1;
473 }
474 }
475 }
476
477 applied_count
478 }
479
480 fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
481 if self.pool_updaters.contains_key(instrument_id)
483 || self.pool_updaters_pending.contains(instrument_id)
484 {
485 log::debug!("Pool updater for {instrument_id} already exists");
486 return;
487 }
488
489 log::info!("Setting up pool updater for {instrument_id}");
490
491 {
493 let mut cache = self.cache.borrow_mut();
494
495 if cache.pool_profiler(instrument_id).is_some() {
496 log::debug!("Pool profiler already exists for {instrument_id}");
498 } else if let Some(pool) = cache.pool(instrument_id) {
499 let pool = Arc::new(pool.clone());
501 let mut pool_profiler = PoolProfiler::new(pool.clone());
502
503 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
504 if let Err(e) = pool_profiler.initialize(initial_sqrt_price_x96) {
505 log::error!("Failed to initialize pool profiler for {instrument_id}: {e}");
506 drop(cache);
507 return;
508 }
509 log::debug!(
510 "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
511 );
512 } else {
513 log::debug!("Created pool profiler for {instrument_id}");
514 }
515
516 if let Err(e) = cache.add_pool_profiler(pool_profiler) {
517 log::error!("Failed to add pool profiler for {instrument_id}: {e}");
518 drop(cache);
519 return;
520 }
521 drop(cache);
522 } else {
523 drop(cache);
525
526 let request_id = UUID4::new();
527 let ts_init = self.clock.borrow().timestamp_ns();
528 let request = RequestPoolSnapshot::new(
529 *instrument_id,
530 client_id.copied(),
531 request_id,
532 ts_init,
533 None,
534 );
535
536 if let Err(e) = self.execute_defi_request(DefiRequestCommand::PoolSnapshot(request))
537 {
538 log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
539 } else {
540 log::debug!("Requested pool snapshot for {instrument_id}");
541 self.pool_snapshot_pending.insert(*instrument_id);
542 self.pool_updaters_pending.insert(*instrument_id);
543 self.pool_event_buffers.entry(*instrument_id).or_default();
544 }
545 return;
546 }
547 }
548
549 let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
551
552 self.subscribe_pool_updater_topics(*instrument_id, updater.clone());
553 self.pool_updaters.insert(*instrument_id, updater);
554
555 log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use std::sync::Arc;
562
563 use alloy_primitives::{Address, I256, U160, U256};
564 use nautilus_core::UnixNanos;
565 use nautilus_model::{
566 defi::{
567 Chain, DefiData, PoolFeeCollect, PoolFlash, PoolIdentifier, PoolLiquidityUpdate,
568 PoolLiquidityUpdateType, PoolSwap,
569 chain::chains,
570 data::DexPoolData,
571 dex::{AmmType, Dex, DexType},
572 },
573 identifiers::{InstrumentId, Symbol, Venue},
574 };
575 use rstest::*;
576
577 use super::*;
578
579 #[fixture]
580 fn test_instrument_id() -> InstrumentId {
581 InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
582 }
583
584 #[fixture]
585 fn test_chain() -> Arc<Chain> {
586 Arc::new(chains::ETHEREUM.clone())
587 }
588
589 #[fixture]
590 fn test_dex(test_chain: Arc<Chain>) -> Arc<Dex> {
591 Arc::new(Dex::new(
592 (*test_chain).clone(),
593 DexType::UniswapV3,
594 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
595 12369621,
596 AmmType::CLAMM,
597 "PoolCreated(address,address,uint24,int24,address)",
598 "Swap(address,address,int256,int256,uint160,uint128,int24)",
599 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
600 "Burn(address,int24,int24,uint128,uint256,uint256)",
601 "Collect(address,address,int24,int24,uint128,uint128)",
602 ))
603 }
604
605 fn create_test_swap(
606 test_instrument_id: InstrumentId,
607 test_chain: Arc<Chain>,
608 test_dex: Arc<Dex>,
609 block: u64,
610 tx_index: u32,
611 log_index: u32,
612 ) -> PoolSwap {
613 PoolSwap::new(
614 test_chain,
615 test_dex,
616 test_instrument_id,
617 PoolIdentifier::from_address(Address::ZERO),
618 block,
619 format!("0x{block:064x}"),
620 tx_index,
621 log_index,
622 UnixNanos::default(),
623 UnixNanos::default(),
624 Address::ZERO,
625 Address::ZERO,
626 I256::ZERO,
627 I256::ZERO,
628 U160::ZERO,
629 0,
630 0,
631 )
632 }
633
634 fn create_test_liquidity_update(
635 test_instrument_id: InstrumentId,
636 test_chain: Arc<Chain>,
637 test_dex: Arc<Dex>,
638 block: u64,
639 tx_index: u32,
640 log_index: u32,
641 ) -> PoolLiquidityUpdate {
642 PoolLiquidityUpdate::new(
643 test_chain,
644 test_dex,
645 test_instrument_id,
646 PoolIdentifier::from_address(Address::ZERO),
647 PoolLiquidityUpdateType::Mint,
648 block,
649 format!("0x{block:064x}"),
650 tx_index,
651 log_index,
652 None,
653 Address::ZERO,
654 0,
655 U256::ZERO,
656 U256::ZERO,
657 0,
658 0,
659 UnixNanos::default(),
660 UnixNanos::default(),
661 )
662 }
663
664 fn create_test_fee_collect(
665 test_instrument_id: InstrumentId,
666 test_chain: Arc<Chain>,
667 test_dex: Arc<Dex>,
668 block: u64,
669 tx_index: u32,
670 log_index: u32,
671 ) -> PoolFeeCollect {
672 PoolFeeCollect::new(
673 test_chain,
674 test_dex,
675 test_instrument_id,
676 PoolIdentifier::from_address(Address::ZERO),
677 block,
678 format!("0x{block:064x}"),
679 tx_index,
680 log_index,
681 Address::ZERO,
682 0,
683 0,
684 0,
685 0,
686 UnixNanos::default(),
687 UnixNanos::default(),
688 )
689 }
690
691 fn create_test_flash(
692 test_instrument_id: InstrumentId,
693 test_chain: Arc<Chain>,
694 test_dex: Arc<Dex>,
695 block: u64,
696 tx_index: u32,
697 log_index: u32,
698 ) -> PoolFlash {
699 PoolFlash::new(
700 test_chain,
701 test_dex,
702 test_instrument_id,
703 PoolIdentifier::from_address(Address::ZERO),
704 block,
705 format!("0x{block:064x}"),
706 tx_index,
707 log_index,
708 UnixNanos::default(),
709 UnixNanos::default(),
710 Address::ZERO,
711 Address::ZERO,
712 U256::ZERO,
713 U256::ZERO,
714 U256::ZERO,
715 U256::ZERO,
716 )
717 }
718
719 #[rstest]
720 fn test_get_event_block_position_swap(
721 test_instrument_id: InstrumentId,
722 test_chain: Arc<Chain>,
723 test_dex: Arc<Dex>,
724 ) {
725 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
726 let pos = get_event_block_position(&DexPoolData::Swap(swap));
727 assert_eq!(pos, (100, 5, 3));
728 }
729
730 #[rstest]
731 fn test_get_event_block_position_liquidity_update(
732 test_instrument_id: InstrumentId,
733 test_chain: Arc<Chain>,
734 test_dex: Arc<Dex>,
735 ) {
736 let update =
737 create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
738 let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
739 assert_eq!(pos, (200, 10, 7));
740 }
741
742 #[rstest]
743 fn test_get_event_block_position_fee_collect(
744 test_instrument_id: InstrumentId,
745 test_chain: Arc<Chain>,
746 test_dex: Arc<Dex>,
747 ) {
748 let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
749 let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
750 assert_eq!(pos, (300, 15, 2));
751 }
752
753 #[rstest]
754 fn test_get_event_block_position_flash(
755 test_instrument_id: InstrumentId,
756 test_chain: Arc<Chain>,
757 test_dex: Arc<Dex>,
758 ) {
759 let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
760 let pos = get_event_block_position(&DexPoolData::Flash(flash));
761 assert_eq!(pos, (400, 20, 8));
762 }
763
764 #[rstest]
765 fn test_convert_and_sort_empty_events() {
766 let events = convert_and_sort_buffered_events(vec![]);
767 assert!(events.is_empty());
768 }
769
770 #[rstest]
771 fn test_convert_and_sort_filters_non_pool_events(
772 test_instrument_id: InstrumentId,
773 test_chain: Arc<Chain>,
774 test_dex: Arc<Dex>,
775 ) {
776 let events = vec![
777 DefiData::PoolSwap(create_test_swap(
778 test_instrument_id,
779 test_chain,
780 test_dex,
781 100,
782 0,
783 0,
784 )),
785 ];
787 let sorted = convert_and_sort_buffered_events(events);
788 assert_eq!(sorted.len(), 1);
789 }
790
791 #[rstest]
792 fn test_convert_and_sort_single_event(
793 test_instrument_id: InstrumentId,
794 test_chain: Arc<Chain>,
795 test_dex: Arc<Dex>,
796 ) {
797 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
798 let events = vec![DefiData::PoolSwap(swap)];
799 let sorted = convert_and_sort_buffered_events(events);
800 assert_eq!(sorted.len(), 1);
801 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
802 }
803
804 #[rstest]
805 fn test_convert_and_sort_already_sorted(
806 test_instrument_id: InstrumentId,
807 test_chain: Arc<Chain>,
808 test_dex: Arc<Dex>,
809 ) {
810 let events = vec![
811 DefiData::PoolSwap(create_test_swap(
812 test_instrument_id,
813 test_chain.clone(),
814 test_dex.clone(),
815 100,
816 0,
817 0,
818 )),
819 DefiData::PoolSwap(create_test_swap(
820 test_instrument_id,
821 test_chain.clone(),
822 test_dex.clone(),
823 100,
824 0,
825 1,
826 )),
827 DefiData::PoolSwap(create_test_swap(
828 test_instrument_id,
829 test_chain,
830 test_dex,
831 100,
832 1,
833 0,
834 )),
835 ];
836 let sorted = convert_and_sort_buffered_events(events);
837 assert_eq!(sorted.len(), 3);
838 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
839 assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
840 assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
841 }
842
843 #[rstest]
844 fn test_convert_and_sort_reverse_order(
845 test_instrument_id: InstrumentId,
846 test_chain: Arc<Chain>,
847 test_dex: Arc<Dex>,
848 ) {
849 let events = vec![
850 DefiData::PoolSwap(create_test_swap(
851 test_instrument_id,
852 test_chain.clone(),
853 test_dex.clone(),
854 100,
855 2,
856 5,
857 )),
858 DefiData::PoolSwap(create_test_swap(
859 test_instrument_id,
860 test_chain.clone(),
861 test_dex.clone(),
862 100,
863 1,
864 3,
865 )),
866 DefiData::PoolSwap(create_test_swap(
867 test_instrument_id,
868 test_chain,
869 test_dex,
870 100,
871 0,
872 1,
873 )),
874 ];
875 let sorted = convert_and_sort_buffered_events(events);
876 assert_eq!(sorted.len(), 3);
877 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
878 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
879 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
880 }
881
882 #[rstest]
883 fn test_convert_and_sort_mixed_blocks(
884 test_instrument_id: InstrumentId,
885 test_chain: Arc<Chain>,
886 test_dex: Arc<Dex>,
887 ) {
888 let events = vec![
889 DefiData::PoolSwap(create_test_swap(
890 test_instrument_id,
891 test_chain.clone(),
892 test_dex.clone(),
893 102,
894 0,
895 0,
896 )),
897 DefiData::PoolSwap(create_test_swap(
898 test_instrument_id,
899 test_chain.clone(),
900 test_dex.clone(),
901 100,
902 5,
903 2,
904 )),
905 DefiData::PoolSwap(create_test_swap(
906 test_instrument_id,
907 test_chain,
908 test_dex,
909 101,
910 3,
911 1,
912 )),
913 ];
914 let sorted = convert_and_sort_buffered_events(events);
915 assert_eq!(sorted.len(), 3);
916 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
917 assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
918 assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
919 }
920
921 #[rstest]
922 fn test_convert_and_sort_mixed_event_types(
923 test_instrument_id: InstrumentId,
924 test_chain: Arc<Chain>,
925 test_dex: Arc<Dex>,
926 ) {
927 let events = vec![
928 DefiData::PoolSwap(create_test_swap(
929 test_instrument_id,
930 test_chain.clone(),
931 test_dex.clone(),
932 100,
933 2,
934 0,
935 )),
936 DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
937 test_instrument_id,
938 test_chain.clone(),
939 test_dex.clone(),
940 100,
941 0,
942 0,
943 )),
944 DefiData::PoolFeeCollect(create_test_fee_collect(
945 test_instrument_id,
946 test_chain.clone(),
947 test_dex.clone(),
948 100,
949 1,
950 0,
951 )),
952 DefiData::PoolFlash(create_test_flash(
953 test_instrument_id,
954 test_chain,
955 test_dex,
956 100,
957 3,
958 0,
959 )),
960 ];
961 let sorted = convert_and_sort_buffered_events(events);
962 assert_eq!(sorted.len(), 4);
963 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
964 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
965 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
966 assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
967 }
968
969 #[rstest]
970 fn test_convert_and_sort_same_block_and_tx_different_log_index(
971 test_instrument_id: InstrumentId,
972 test_chain: Arc<Chain>,
973 test_dex: Arc<Dex>,
974 ) {
975 let events = vec![
976 DefiData::PoolSwap(create_test_swap(
977 test_instrument_id,
978 test_chain.clone(),
979 test_dex.clone(),
980 100,
981 5,
982 10,
983 )),
984 DefiData::PoolSwap(create_test_swap(
985 test_instrument_id,
986 test_chain.clone(),
987 test_dex.clone(),
988 100,
989 5,
990 5,
991 )),
992 DefiData::PoolSwap(create_test_swap(
993 test_instrument_id,
994 test_chain,
995 test_dex,
996 100,
997 5,
998 1,
999 )),
1000 ];
1001 let sorted = convert_and_sort_buffered_events(events);
1002 assert_eq!(sorted.len(), 3);
1003 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
1004 assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
1005 assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
1006 }
1007}