Skip to main content

nautilus_data/defi/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific data engine functionality.
17//!
18//! This module provides DeFi processing methods for the `DataEngine`.
19//! All code in this module requires the `defi` feature flag.
20
21use std::{rc::Rc, sync::Arc};
22
23use nautilus_common::{
24    defi,
25    messages::defi::{
26        DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27    },
28    msgbus::{self, TypedHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32    defi::{
33        Blockchain, DefiData, PoolProfiler,
34        data::{DexPoolData, block::BlockPosition},
35    },
36    identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{
40    DataEngine,
41    pool::{
42        PoolCollectHandler, PoolFlashHandler, PoolLiquidityHandler, PoolSwapHandler, PoolUpdater,
43    },
44};
45
46/// Extracts the block position tuple from a `DexPoolData` event.
47fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
48    match event {
49        DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
50        DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
51        DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
52        DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
53    }
54}
55
56/// Converts buffered `DefiData` events to `DexPoolData` and sorts by block position.
57fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
58    let mut events: Vec<DexPoolData> = buffered_events
59        .into_iter()
60        .filter_map(|event| match event {
61            DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
62            DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
63            DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
64            DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
65            _ => None,
66        })
67        .collect();
68
69    events.sort_by(|a, b| {
70        let pos_a = get_event_block_position(a);
71        let pos_b = get_event_block_position(b);
72        pos_a.cmp(&pos_b)
73    });
74
75    events
76}
77
78impl DataEngine {
79    /// Returns all blockchains for which blocks subscriptions exist.
80    #[must_use]
81    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
82        self.collect_subscriptions(|client| &client.subscriptions_blocks)
83    }
84
85    /// Returns all instrument IDs for which pool subscriptions exist.
86    #[must_use]
87    pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
88        self.collect_subscriptions(|client| &client.subscriptions_pools)
89    }
90
91    /// Returns all instrument IDs for which swap subscriptions exist.
92    #[must_use]
93    pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
94        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
95    }
96
97    /// Returns all instrument IDs for which liquidity update subscriptions exist.
98    #[must_use]
99    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
100        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
101    }
102
103    /// Returns all instrument IDs for which fee collect subscriptions exist.
104    #[must_use]
105    pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
106        self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
107    }
108
109    /// Returns all instrument IDs for which flash loan subscriptions exist.
110    #[must_use]
111    pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
112        self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
113    }
114
115    /// Handles a subscribe command, updating internal state and forwarding to the client.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
120    /// or if the underlying client operation fails.
121    pub fn execute_defi_subscribe(&mut self, cmd: DefiSubscribeCommand) -> anyhow::Result<()> {
122        if let Some(client_id) = cmd.client_id()
123            && self.external_clients.contains(client_id)
124        {
125            if self.config.debug {
126                log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}");
127            }
128            return Ok(());
129        }
130
131        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
132            log::info!("Forwarding subscription to client {}", client.client_id);
133            client.execute_defi_subscribe(cmd.clone());
134        } else {
135            log::error!(
136                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
137                cmd.client_id(),
138                cmd.venue(),
139            );
140        }
141
142        match cmd {
143            DefiSubscribeCommand::Pool(cmd) => {
144                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
145            }
146            DefiSubscribeCommand::PoolSwaps(cmd) => {
147                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
148            }
149            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
150                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
151            }
152            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
153                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
154            }
155            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
156                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
157            }
158            DefiSubscribeCommand::Blocks(_) => {} // No pool setup needed for blocks
159        }
160
161        Ok(())
162    }
163
164    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the underlying client operation fails.
169    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
170        if let Some(client_id) = cmd.client_id()
171            && self.external_clients.contains(client_id)
172        {
173            if self.config.debug {
174                log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}");
175            }
176            return Ok(());
177        }
178
179        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
180            client.execute_defi_unsubscribe(cmd);
181        } else {
182            log::error!(
183                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
184                cmd.client_id(),
185                cmd.venue(),
186            );
187        }
188
189        Ok(())
190    }
191
192    /// Sends a [`DefiRequestCommand`] to a suitable data client implementation.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if no client is found for the given client ID or venue,
197    /// or if the client fails to process the request.
198    pub fn execute_defi_request(&mut self, req: DefiRequestCommand) -> anyhow::Result<()> {
199        // Skip requests for external clients
200        if let Some(cid) = req.client_id()
201            && self.external_clients.contains(cid)
202        {
203            if self.config.debug {
204                log::debug!("Skipping defi data request for external client {cid}: {req:?}");
205            }
206            return Ok(());
207        }
208
209        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
210            client.execute_defi_request(req)
211        } else {
212            anyhow::bail!(
213                "Cannot handle request: no client found for {:?} {:?}",
214                req.client_id(),
215                req.venue()
216            );
217        }
218    }
219
220    /// Processes DeFi-specific data events.
221    pub fn process_defi_data(&mut self, data: DefiData) {
222        self.increment_data_count();
223
224        match data {
225            DefiData::Block(block) => {
226                let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
227                msgbus::publish_defi_block(topic, &block);
228            }
229            DefiData::Pool(pool) => {
230                if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
231                    log::error!("Failed to add Pool to cache: {e}");
232                }
233
234                // Check if pool profiler creation was deferred. When a snapshot is also
235                // pending, leave the updater marker in place so a second subscription cannot
236                // race ahead and install a duplicate updater. The snapshot handler clears
237                // both flags and creates the updater once.
238                if self.pool_updaters_pending.contains(&pool.instrument_id) {
239                    if self.pool_snapshot_pending.contains(&pool.instrument_id) {
240                        log::debug!(
241                            "Pool {} loaded; deferring profiler creation to snapshot handler",
242                            pool.instrument_id
243                        );
244                    } else {
245                        self.pool_updaters_pending.remove(&pool.instrument_id);
246                        log::info!(
247                            "Pool {} now loaded, creating deferred pool profiler",
248                            pool.instrument_id
249                        );
250                        self.setup_pool_updater(&pool.instrument_id, None);
251                    }
252                }
253
254                let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
255                msgbus::publish_defi_pool(topic, &pool);
256            }
257            DefiData::PoolSnapshot(snapshot) => {
258                let instrument_id = snapshot.instrument_id;
259                log::info!(
260                    "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
261                    snapshot.block_position.number,
262                    snapshot.positions.len(),
263                    snapshot.ticks.len()
264                );
265
266                // Validate we're expecting this snapshot
267                if !self.pool_snapshot_pending.contains(&instrument_id) {
268                    log::warn!(
269                        "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
270                    );
271                    return;
272                }
273
274                // Get pool from cache
275                let pool = match self.cache.borrow().pool(&instrument_id) {
276                    Some(pool) => Arc::new(pool.clone()),
277                    None => {
278                        log::error!(
279                            "Pool {instrument_id} not found in cache when processing snapshot"
280                        );
281                        return;
282                    }
283                };
284
285                // Defensive: refuse stub snapshots that slipped past the bootstrap-side
286                // guards. Installing one would leave Python actors observing an initialized
287                // profiler with zero liquidity; better to leave the pool without a profiler
288                // so the bad state is visible.
289                if snapshot.positions.is_empty()
290                    && snapshot.ticks.is_empty()
291                    && snapshot.block_position.number == pool.creation_block
292                {
293                    log::warn!(
294                        "Refusing empty stub snapshot for {instrument_id} at pool creation block {}; pool will remain without profiler",
295                        snapshot.block_position.number,
296                    );
297                    self.pool_snapshot_pending.remove(&instrument_id);
298                    self.pool_updaters_pending.remove(&instrument_id);
299                    self.pool_event_buffers.remove(&instrument_id);
300                    return;
301                }
302
303                // Create profiler and restore from snapshot
304                let mut profiler = PoolProfiler::new(pool);
305                if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
306                    log::error!(
307                        "Failed to restore profiler from snapshot for {instrument_id}: {e}"
308                    );
309                    return;
310                }
311                log::debug!("Restored pool profiler for {instrument_id} from snapshot");
312
313                // Process buffered events
314                let buffered_events = self
315                    .pool_event_buffers
316                    .remove(&instrument_id)
317                    .unwrap_or_default();
318
319                if !buffered_events.is_empty() {
320                    log::info!(
321                        "Processing {} buffered events for {instrument_id}",
322                        buffered_events.len()
323                    );
324
325                    let events_to_apply = convert_and_sort_buffered_events(buffered_events);
326                    let applied_count = Self::apply_buffered_events_to_profiler(
327                        &mut profiler,
328                        events_to_apply,
329                        &snapshot.block_position,
330                        instrument_id,
331                    );
332
333                    log::info!(
334                        "Applied {applied_count} buffered events to profiler for {instrument_id}"
335                    );
336                }
337
338                // Add profiler to cache
339                if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
340                    log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
341                    return;
342                }
343
344                // Create updater and subscribe to topics
345                self.pool_snapshot_pending.remove(&instrument_id);
346                self.pool_updaters_pending.remove(&instrument_id);
347                let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
348
349                self.subscribe_pool_updater_topics(instrument_id, updater.clone());
350                self.pool_updaters.insert(instrument_id, updater);
351
352                log::info!(
353                    "Pool profiler setup completed for {instrument_id}, now processing live events"
354                );
355            }
356            DefiData::PoolSwap(swap) => {
357                let instrument_id = swap.instrument_id;
358                // Buffer if waiting for snapshot, otherwise publish
359                if self.pool_snapshot_pending.contains(&instrument_id) {
360                    log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
361                    self.pool_event_buffers
362                        .entry(instrument_id)
363                        .or_default()
364                        .push(DefiData::PoolSwap(swap));
365                } else {
366                    let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
367                    msgbus::publish_defi_swap(topic, &swap);
368                }
369            }
370            DefiData::PoolLiquidityUpdate(update) => {
371                let instrument_id = update.instrument_id;
372                // Buffer if waiting for snapshot, otherwise publish
373                if self.pool_snapshot_pending.contains(&instrument_id) {
374                    log::debug!(
375                        "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
376                    );
377                    self.pool_event_buffers
378                        .entry(instrument_id)
379                        .or_default()
380                        .push(DefiData::PoolLiquidityUpdate(update));
381                } else {
382                    let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
383                    msgbus::publish_defi_liquidity(topic, &update);
384                }
385            }
386            DefiData::PoolFeeCollect(collect) => {
387                let instrument_id = collect.instrument_id;
388                // Buffer if waiting for snapshot, otherwise publish
389                if self.pool_snapshot_pending.contains(&instrument_id) {
390                    log::debug!(
391                        "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
392                    );
393                    self.pool_event_buffers
394                        .entry(instrument_id)
395                        .or_default()
396                        .push(DefiData::PoolFeeCollect(collect));
397                } else {
398                    let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
399                    msgbus::publish_defi_collect(topic, &collect);
400                }
401            }
402            DefiData::PoolFlash(flash) => {
403                let instrument_id = flash.instrument_id;
404                // Buffer if waiting for snapshot, otherwise publish
405                if self.pool_snapshot_pending.contains(&instrument_id) {
406                    log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
407                    self.pool_event_buffers
408                        .entry(instrument_id)
409                        .or_default()
410                        .push(DefiData::PoolFlash(flash));
411                } else {
412                    let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
413                    msgbus::publish_defi_flash(topic, &flash);
414                }
415            }
416        }
417    }
418
419    /// Subscribes a pool updater to all relevant pool data topics using typed handlers.
420    fn subscribe_pool_updater_topics(&self, instrument_id: InstrumentId, updater: Rc<PoolUpdater>) {
421        let priority = Some(self.msgbus_priority);
422
423        // Subscribe swap handler
424        let swap_topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
425        let swap_handler = TypedHandler(Rc::new(PoolSwapHandler::new(updater.clone())));
426        msgbus::subscribe_defi_swaps(swap_topic.into(), swap_handler, priority);
427
428        // Subscribe liquidity handler
429        let liq_topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
430        let liq_handler = TypedHandler(Rc::new(PoolLiquidityHandler::new(updater.clone())));
431        msgbus::subscribe_defi_liquidity(liq_topic.into(), liq_handler, priority);
432
433        // Subscribe collect handler
434        let collect_topic = defi::switchboard::get_defi_collect_topic(instrument_id);
435        let collect_handler = TypedHandler(Rc::new(PoolCollectHandler::new(updater.clone())));
436        msgbus::subscribe_defi_collects(collect_topic.into(), collect_handler, priority);
437
438        // Subscribe flash handler
439        let flash_topic = defi::switchboard::get_defi_flash_topic(instrument_id);
440        let flash_handler = TypedHandler(Rc::new(PoolFlashHandler::new(updater)));
441        msgbus::subscribe_defi_flash(flash_topic.into(), flash_handler, priority);
442    }
443
444    /// Applies buffered events to a pool profiler, filtering to events after the snapshot.
445    ///
446    /// Returns the count of successfully applied events.
447    fn apply_buffered_events_to_profiler(
448        profiler: &mut PoolProfiler,
449        events: Vec<DexPoolData>,
450        snapshot_block: &BlockPosition,
451        instrument_id: InstrumentId,
452    ) -> usize {
453        let mut applied_count = 0;
454
455        for event in events {
456            let event_block = get_event_block_position(&event);
457
458            // Only apply events that occurred after the snapshot
459            let is_after_snapshot = event_block.0 > snapshot_block.number
460                || (event_block.0 == snapshot_block.number
461                    && event_block.1 > snapshot_block.transaction_index)
462                || (event_block.0 == snapshot_block.number
463                    && event_block.1 == snapshot_block.transaction_index
464                    && event_block.2 > snapshot_block.log_index);
465
466            if is_after_snapshot {
467                if let Err(e) = profiler.process(&event) {
468                    log::error!(
469                        "Failed to apply buffered event to profiler for {instrument_id}: {e}"
470                    );
471                } else {
472                    applied_count += 1;
473                }
474            }
475        }
476
477        applied_count
478    }
479
480    fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
481        // Early return if updater already exists or we are in the middle of setting it up.
482        if self.pool_updaters.contains_key(instrument_id)
483            || self.pool_updaters_pending.contains(instrument_id)
484        {
485            log::debug!("Pool updater for {instrument_id} already exists");
486            return;
487        }
488
489        log::info!("Setting up pool updater for {instrument_id}");
490
491        // Check cache state and ensure profiler exists
492        {
493            let mut cache = self.cache.borrow_mut();
494
495            if cache.pool_profiler(instrument_id).is_some() {
496                // Profiler already exists, proceed to create updater
497                log::debug!("Pool profiler already exists for {instrument_id}");
498            } else if let Some(pool) = cache.pool(instrument_id) {
499                // Pool exists but no profiler, create profiler from pool
500                let pool = Arc::new(pool.clone());
501                let mut pool_profiler = PoolProfiler::new(pool.clone());
502
503                if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
504                    if let Err(e) = pool_profiler.initialize(initial_sqrt_price_x96) {
505                        log::error!("Failed to initialize pool profiler for {instrument_id}: {e}");
506                        drop(cache);
507                        return;
508                    }
509                    log::debug!(
510                        "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
511                    );
512                } else {
513                    log::debug!("Created pool profiler for {instrument_id}");
514                }
515
516                if let Err(e) = cache.add_pool_profiler(pool_profiler) {
517                    log::error!("Failed to add pool profiler for {instrument_id}: {e}");
518                    drop(cache);
519                    return;
520                }
521                drop(cache);
522            } else {
523                // Neither profiler nor pool exists, request snapshot
524                drop(cache);
525
526                let request_id = UUID4::new();
527                let ts_init = self.clock.borrow().timestamp_ns();
528                let request = RequestPoolSnapshot::new(
529                    *instrument_id,
530                    client_id.copied(),
531                    request_id,
532                    ts_init,
533                    None,
534                );
535
536                if let Err(e) = self.execute_defi_request(DefiRequestCommand::PoolSnapshot(request))
537                {
538                    log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
539                } else {
540                    log::debug!("Requested pool snapshot for {instrument_id}");
541                    self.pool_snapshot_pending.insert(*instrument_id);
542                    self.pool_updaters_pending.insert(*instrument_id);
543                    self.pool_event_buffers.entry(*instrument_id).or_default();
544                }
545                return;
546            }
547        }
548
549        // Profiler exists, create updater and subscribe to topics
550        let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
551
552        self.subscribe_pool_updater_topics(*instrument_id, updater.clone());
553        self.pool_updaters.insert(*instrument_id, updater);
554
555        log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use std::sync::Arc;
562
563    use alloy_primitives::{Address, I256, U160, U256};
564    use nautilus_core::UnixNanos;
565    use nautilus_model::{
566        defi::{
567            Chain, DefiData, PoolFeeCollect, PoolFlash, PoolIdentifier, PoolLiquidityUpdate,
568            PoolLiquidityUpdateType, PoolSwap,
569            chain::chains,
570            data::DexPoolData,
571            dex::{AmmType, Dex, DexType},
572        },
573        identifiers::{InstrumentId, Symbol, Venue},
574    };
575    use rstest::*;
576
577    use super::*;
578
579    #[fixture]
580    fn test_instrument_id() -> InstrumentId {
581        InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
582    }
583
584    #[fixture]
585    fn test_chain() -> Arc<Chain> {
586        Arc::new(chains::ETHEREUM.clone())
587    }
588
589    #[fixture]
590    fn test_dex(test_chain: Arc<Chain>) -> Arc<Dex> {
591        Arc::new(Dex::new(
592            (*test_chain).clone(),
593            DexType::UniswapV3,
594            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
595            12369621,
596            AmmType::CLAMM,
597            "PoolCreated(address,address,uint24,int24,address)",
598            "Swap(address,address,int256,int256,uint160,uint128,int24)",
599            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
600            "Burn(address,int24,int24,uint128,uint256,uint256)",
601            "Collect(address,address,int24,int24,uint128,uint128)",
602        ))
603    }
604
605    fn create_test_swap(
606        test_instrument_id: InstrumentId,
607        test_chain: Arc<Chain>,
608        test_dex: Arc<Dex>,
609        block: u64,
610        tx_index: u32,
611        log_index: u32,
612    ) -> PoolSwap {
613        PoolSwap::new(
614            test_chain,
615            test_dex,
616            test_instrument_id,
617            PoolIdentifier::from_address(Address::ZERO),
618            block,
619            format!("0x{block:064x}"),
620            tx_index,
621            log_index,
622            UnixNanos::default(),
623            UnixNanos::default(),
624            Address::ZERO,
625            Address::ZERO,
626            I256::ZERO,
627            I256::ZERO,
628            U160::ZERO,
629            0,
630            0,
631        )
632    }
633
634    fn create_test_liquidity_update(
635        test_instrument_id: InstrumentId,
636        test_chain: Arc<Chain>,
637        test_dex: Arc<Dex>,
638        block: u64,
639        tx_index: u32,
640        log_index: u32,
641    ) -> PoolLiquidityUpdate {
642        PoolLiquidityUpdate::new(
643            test_chain,
644            test_dex,
645            test_instrument_id,
646            PoolIdentifier::from_address(Address::ZERO),
647            PoolLiquidityUpdateType::Mint,
648            block,
649            format!("0x{block:064x}"),
650            tx_index,
651            log_index,
652            None,
653            Address::ZERO,
654            0,
655            U256::ZERO,
656            U256::ZERO,
657            0,
658            0,
659            UnixNanos::default(),
660            UnixNanos::default(),
661        )
662    }
663
664    fn create_test_fee_collect(
665        test_instrument_id: InstrumentId,
666        test_chain: Arc<Chain>,
667        test_dex: Arc<Dex>,
668        block: u64,
669        tx_index: u32,
670        log_index: u32,
671    ) -> PoolFeeCollect {
672        PoolFeeCollect::new(
673            test_chain,
674            test_dex,
675            test_instrument_id,
676            PoolIdentifier::from_address(Address::ZERO),
677            block,
678            format!("0x{block:064x}"),
679            tx_index,
680            log_index,
681            Address::ZERO,
682            0,
683            0,
684            0,
685            0,
686            UnixNanos::default(),
687            UnixNanos::default(),
688        )
689    }
690
691    fn create_test_flash(
692        test_instrument_id: InstrumentId,
693        test_chain: Arc<Chain>,
694        test_dex: Arc<Dex>,
695        block: u64,
696        tx_index: u32,
697        log_index: u32,
698    ) -> PoolFlash {
699        PoolFlash::new(
700            test_chain,
701            test_dex,
702            test_instrument_id,
703            PoolIdentifier::from_address(Address::ZERO),
704            block,
705            format!("0x{block:064x}"),
706            tx_index,
707            log_index,
708            UnixNanos::default(),
709            UnixNanos::default(),
710            Address::ZERO,
711            Address::ZERO,
712            U256::ZERO,
713            U256::ZERO,
714            U256::ZERO,
715            U256::ZERO,
716        )
717    }
718
719    #[rstest]
720    fn test_get_event_block_position_swap(
721        test_instrument_id: InstrumentId,
722        test_chain: Arc<Chain>,
723        test_dex: Arc<Dex>,
724    ) {
725        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
726        let pos = get_event_block_position(&DexPoolData::Swap(swap));
727        assert_eq!(pos, (100, 5, 3));
728    }
729
730    #[rstest]
731    fn test_get_event_block_position_liquidity_update(
732        test_instrument_id: InstrumentId,
733        test_chain: Arc<Chain>,
734        test_dex: Arc<Dex>,
735    ) {
736        let update =
737            create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
738        let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
739        assert_eq!(pos, (200, 10, 7));
740    }
741
742    #[rstest]
743    fn test_get_event_block_position_fee_collect(
744        test_instrument_id: InstrumentId,
745        test_chain: Arc<Chain>,
746        test_dex: Arc<Dex>,
747    ) {
748        let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
749        let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
750        assert_eq!(pos, (300, 15, 2));
751    }
752
753    #[rstest]
754    fn test_get_event_block_position_flash(
755        test_instrument_id: InstrumentId,
756        test_chain: Arc<Chain>,
757        test_dex: Arc<Dex>,
758    ) {
759        let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
760        let pos = get_event_block_position(&DexPoolData::Flash(flash));
761        assert_eq!(pos, (400, 20, 8));
762    }
763
764    #[rstest]
765    fn test_convert_and_sort_empty_events() {
766        let events = convert_and_sort_buffered_events(vec![]);
767        assert!(events.is_empty());
768    }
769
770    #[rstest]
771    fn test_convert_and_sort_filters_non_pool_events(
772        test_instrument_id: InstrumentId,
773        test_chain: Arc<Chain>,
774        test_dex: Arc<Dex>,
775    ) {
776        let events = vec![
777            DefiData::PoolSwap(create_test_swap(
778                test_instrument_id,
779                test_chain,
780                test_dex,
781                100,
782                0,
783                0,
784            )),
785            // Block events would be filtered out
786        ];
787        let sorted = convert_and_sort_buffered_events(events);
788        assert_eq!(sorted.len(), 1);
789    }
790
791    #[rstest]
792    fn test_convert_and_sort_single_event(
793        test_instrument_id: InstrumentId,
794        test_chain: Arc<Chain>,
795        test_dex: Arc<Dex>,
796    ) {
797        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
798        let events = vec![DefiData::PoolSwap(swap)];
799        let sorted = convert_and_sort_buffered_events(events);
800        assert_eq!(sorted.len(), 1);
801        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
802    }
803
804    #[rstest]
805    fn test_convert_and_sort_already_sorted(
806        test_instrument_id: InstrumentId,
807        test_chain: Arc<Chain>,
808        test_dex: Arc<Dex>,
809    ) {
810        let events = vec![
811            DefiData::PoolSwap(create_test_swap(
812                test_instrument_id,
813                test_chain.clone(),
814                test_dex.clone(),
815                100,
816                0,
817                0,
818            )),
819            DefiData::PoolSwap(create_test_swap(
820                test_instrument_id,
821                test_chain.clone(),
822                test_dex.clone(),
823                100,
824                0,
825                1,
826            )),
827            DefiData::PoolSwap(create_test_swap(
828                test_instrument_id,
829                test_chain,
830                test_dex,
831                100,
832                1,
833                0,
834            )),
835        ];
836        let sorted = convert_and_sort_buffered_events(events);
837        assert_eq!(sorted.len(), 3);
838        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
839        assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
840        assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
841    }
842
843    #[rstest]
844    fn test_convert_and_sort_reverse_order(
845        test_instrument_id: InstrumentId,
846        test_chain: Arc<Chain>,
847        test_dex: Arc<Dex>,
848    ) {
849        let events = vec![
850            DefiData::PoolSwap(create_test_swap(
851                test_instrument_id,
852                test_chain.clone(),
853                test_dex.clone(),
854                100,
855                2,
856                5,
857            )),
858            DefiData::PoolSwap(create_test_swap(
859                test_instrument_id,
860                test_chain.clone(),
861                test_dex.clone(),
862                100,
863                1,
864                3,
865            )),
866            DefiData::PoolSwap(create_test_swap(
867                test_instrument_id,
868                test_chain,
869                test_dex,
870                100,
871                0,
872                1,
873            )),
874        ];
875        let sorted = convert_and_sort_buffered_events(events);
876        assert_eq!(sorted.len(), 3);
877        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
878        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
879        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
880    }
881
882    #[rstest]
883    fn test_convert_and_sort_mixed_blocks(
884        test_instrument_id: InstrumentId,
885        test_chain: Arc<Chain>,
886        test_dex: Arc<Dex>,
887    ) {
888        let events = vec![
889            DefiData::PoolSwap(create_test_swap(
890                test_instrument_id,
891                test_chain.clone(),
892                test_dex.clone(),
893                102,
894                0,
895                0,
896            )),
897            DefiData::PoolSwap(create_test_swap(
898                test_instrument_id,
899                test_chain.clone(),
900                test_dex.clone(),
901                100,
902                5,
903                2,
904            )),
905            DefiData::PoolSwap(create_test_swap(
906                test_instrument_id,
907                test_chain,
908                test_dex,
909                101,
910                3,
911                1,
912            )),
913        ];
914        let sorted = convert_and_sort_buffered_events(events);
915        assert_eq!(sorted.len(), 3);
916        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
917        assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
918        assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
919    }
920
921    #[rstest]
922    fn test_convert_and_sort_mixed_event_types(
923        test_instrument_id: InstrumentId,
924        test_chain: Arc<Chain>,
925        test_dex: Arc<Dex>,
926    ) {
927        let events = vec![
928            DefiData::PoolSwap(create_test_swap(
929                test_instrument_id,
930                test_chain.clone(),
931                test_dex.clone(),
932                100,
933                2,
934                0,
935            )),
936            DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
937                test_instrument_id,
938                test_chain.clone(),
939                test_dex.clone(),
940                100,
941                0,
942                0,
943            )),
944            DefiData::PoolFeeCollect(create_test_fee_collect(
945                test_instrument_id,
946                test_chain.clone(),
947                test_dex.clone(),
948                100,
949                1,
950                0,
951            )),
952            DefiData::PoolFlash(create_test_flash(
953                test_instrument_id,
954                test_chain,
955                test_dex,
956                100,
957                3,
958                0,
959            )),
960        ];
961        let sorted = convert_and_sort_buffered_events(events);
962        assert_eq!(sorted.len(), 4);
963        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
964        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
965        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
966        assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
967    }
968
969    #[rstest]
970    fn test_convert_and_sort_same_block_and_tx_different_log_index(
971        test_instrument_id: InstrumentId,
972        test_chain: Arc<Chain>,
973        test_dex: Arc<Dex>,
974    ) {
975        let events = vec![
976            DefiData::PoolSwap(create_test_swap(
977                test_instrument_id,
978                test_chain.clone(),
979                test_dex.clone(),
980                100,
981                5,
982                10,
983            )),
984            DefiData::PoolSwap(create_test_swap(
985                test_instrument_id,
986                test_chain.clone(),
987                test_dex.clone(),
988                100,
989                5,
990                5,
991            )),
992            DefiData::PoolSwap(create_test_swap(
993                test_instrument_id,
994                test_chain,
995                test_dex,
996                100,
997                5,
998                1,
999            )),
1000        ];
1001        let sorted = convert_and_sort_buffered_events(events);
1002        assert_eq!(sorted.len(), 3);
1003        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
1004        assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
1005        assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
1006    }
1007}