Skip to main content

tycho_simulation/evm/
decoder.rs

1use std::{
2    collections::{hash_map::Entry, HashMap, HashSet},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14    dto::{ChangeType, ProtocolStateDelta},
15    models::{blockchain::BlockAggregatedChanges, token::Token, Chain},
16    simulation::protocol_sim::{Balances, ProtocolSim},
17    Bytes,
18};
19#[cfg(test)]
20use {
21    mockall::mock,
22    num_bigint::BigUint,
23    std::any::Any,
24    tycho_common::simulation::{
25        errors::{SimulationError, TransitionError},
26        protocol_sim::GetAmountOutResult,
27    },
28};
29
30use crate::{
31    evm::{
32        engine_db::{update_engine, SHARED_TYCHO_DB},
33        protocol::{
34            utils::bytes_to_address,
35            vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36        },
37        tycho_models::{AccountUpdate, ResponseAccount},
38    },
39    protocol::{
40        errors::InvalidSnapshotError,
41        models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42    },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47    #[error("{0}")]
48    Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53    tokens: HashMap<Bytes, Token>,
54    states: HashMap<String, Box<dyn ProtocolSim>>,
55    components: HashMap<String, ProtocolComponent>,
56    // maps contract address to the pools they affect
57    contracts_map: HashMap<Bytes, HashSet<String>>,
58    // Maps original token address to their new proxy token address
59    proxy_token_addresses: HashMap<Address, Address>,
60    // Set of failed components, these are components that failed to decode and will not be emitted
61    // again TODO: handle more gracefully inside tycho-client. We could fetch the snapshot and
62    // try to decode it again.
63    failed_components: HashSet<String>,
64    // The block number of the last confirmed block decoded via `decode()`.
65    current_block_number: u64,
66}
67
68type DecodeFut =
69    Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
70type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
71type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
72    + Send
73    + Sync;
74type FilterFn = fn(&ComponentWithState) -> bool;
75
76/// A decoder to process raw messages.
77///
78/// This struct decodes incoming messages of type `FeedMessage` and converts it into the
79/// `BlockUpdate` struct.
80///
81/// # Important:
82/// - Supports registering exchanges and their associated filters for specific protocol components.
83/// - Allows the addition of client-side filters for custom conditions.
84///
85/// **Note:** Tokens provided via [`set_tokens`](Self::set_tokens) are used to decode startup
86/// snapshots and initialize protocol states. This is not an ongoing filter — components arriving
87/// after startup include their own token metadata.
88pub struct TychoStreamDecoder<H>
89where
90    H: HeaderLike,
91{
92    state: Arc<RwLock<DecoderState>>,
93    skip_state_decode_failures: bool,
94    min_token_quality: u32,
95    registry: HashMap<String, Box<RegistryFn<H>>>,
96    inclusion_filters: HashMap<String, FilterFn>,
97}
98
99impl<H> Default for TychoStreamDecoder<H>
100where
101    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
102{
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108impl<H> TychoStreamDecoder<H>
109where
110    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
111{
112    pub fn new() -> Self {
113        Self {
114            state: Arc::new(RwLock::new(DecoderState::default())),
115            skip_state_decode_failures: false,
116            min_token_quality: 100,
117            registry: HashMap::new(),
118            inclusion_filters: HashMap::new(),
119        }
120    }
121
122    /// Provides token metadata used to decode startup snapshots and initialize protocol states.
123    ///
124    /// This is not an ongoing stream filter. Components arriving after startup include their
125    /// own token metadata for decoding.
126    pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
127        let mut guard = self.state.write().await;
128        guard.tokens = tokens;
129    }
130
131    pub fn skip_state_decode_failures(&mut self, skip: bool) {
132        self.skip_state_decode_failures = skip;
133    }
134
135    /// Sets the minimum token quality for decoding.
136    ///
137    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
138    /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
139    /// apply consistent filtering.
140    pub fn min_token_quality(&mut self, quality: u32) {
141        self.min_token_quality = quality;
142    }
143
144    /// Registers a decoder for a given exchange with a decoder context.
145    ///
146    /// This method maps an exchange identifier to a specific protocol simulation type.
147    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
148    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
149    /// the component data into the appropriate protocol simulation type based on the current
150    /// blockchain state and the provided block header.
151    /// For example, to register a decoder for the `uniswap_v2` exchange with an additional decoder
152    /// context, you must call this function with
153    /// `register_decoder_with_context::<UniswapV2State>("uniswap_v2", context)`.
154    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
155    /// `UniswapV2State` decoder for use in the protocol stream.
156    pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
157    where
158        T: ProtocolSim
159            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
160            + Send
161            + 'static,
162    {
163        let decoder = Box::new(
164            move |component: ComponentWithState,
165                  header: H,
166                  account_balances: AccountBalances,
167                  state: Arc<RwLock<DecoderState>>| {
168                let context = context.clone();
169                Box::pin(async move {
170                    let guard = state.read().await;
171                    T::try_from_with_header(
172                        component,
173                        header,
174                        &account_balances,
175                        &guard.tokens,
176                        &context,
177                    )
178                    .await
179                    .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
180                }) as DecodeFut
181            },
182        );
183        self.registry
184            .insert(exchange.to_string(), decoder);
185    }
186
187    /// Registers a decoder for a given exchange.
188    ///
189    /// This method maps an exchange identifier to a specific protocol simulation type.
190    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
191    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
192    /// the component data into the appropriate protocol simulation type based on the current
193    /// blockchain state and the provided block header.
194    /// For example, to register a decoder for the `uniswap_v2` exchange, you must call
195    /// this function with `register_decoder::<UniswapV2State>("uniswap_v2", vm_attributes)`.
196    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
197    /// `UniswapV2State` decoder for use in the protocol stream.
198    pub fn register_decoder<T>(&mut self, exchange: &str)
199    where
200        T: ProtocolSim
201            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
202            + Send
203            + 'static,
204    {
205        let context = DecoderContext::new();
206        self.register_decoder_with_context::<T>(exchange, context);
207    }
208
209    /// Registers a client-side filter function for a given exchange.
210    ///
211    /// Associates a filter function with an exchange ID, enabling custom filtering of protocol
212    /// components. The filter function is applied client-side to refine the data received from the
213    /// stream. It can be used to exclude certain components based on attributes or conditions that
214    /// are not supported by the server-side filtering logic. This is particularly useful for
215    /// implementing custom behaviors, such as:
216    /// - Filtering out pools with specific attributes (e.g., unsupported features).
217    /// - Blacklisting pools based on custom criteria.
218    /// - Excluding pools that do not meet certain requirements (e.g., token pairs or liquidity
219    ///   constraints).
220    ///
221    /// For example, you might use a filter to exclude pools that are not fully supported in the
222    /// protocol, or to ignore pools with certain attributes that are irrelevant to your
223    /// application.
224    pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
225        self.inclusion_filters
226            .insert(exchange.to_string(), predicate);
227    }
228
229    /// Decodes a `FeedMessage` into a `BlockUpdate` containing the updated states of protocol
230    /// components
231    pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
232        // stores all states updated in this tick/msg
233        let mut updated_states = HashMap::new();
234        let mut new_pairs = HashMap::new();
235        let mut removed_pairs = HashMap::new();
236        let mut contracts_map = HashMap::new();
237        let mut msg_failed_components = HashSet::new();
238
239        let header = msg
240            .state_msgs
241            .values()
242            .next()
243            .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
244            .header
245            .clone();
246
247        let block_number_or_timestamp = header
248            .clone()
249            .block_number_or_timestamp();
250        let current_block = header.clone().block();
251        let is_partial = current_block
252            .as_ref()
253            .map(|h| h.partial_block_index.is_some())
254            .unwrap_or(false);
255
256        for (protocol, protocol_msg) in msg.state_msgs.iter() {
257            // Add any new tokens
258            if let Some(deltas) = protocol_msg.deltas.as_ref() {
259                let mut state_guard = self.state.write().await;
260
261                let new_tokens = deltas
262                    .new_tokens
263                    .iter()
264                    .filter(|(addr, t)| {
265                        t.quality >= self.min_token_quality &&
266                            !state_guard.tokens.contains_key(*addr)
267                    })
268                    .map(|(addr, t)| (addr.clone(), t.clone()))
269                    .collect::<HashMap<Bytes, Token>>();
270
271                if !new_tokens.is_empty() {
272                    debug!(n = new_tokens.len(), "NewTokens");
273                    state_guard.tokens.extend(new_tokens);
274                }
275            }
276
277            // Remove untracked components
278            {
279                let mut state_guard = self.state.write().await;
280                let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
281                    .removed_components
282                    .iter()
283                    .map(|(id, comp)| {
284                        if *id != comp.id {
285                            error!(
286                                "Component id mismatch in removed components {id} != {}",
287                                comp.id
288                            );
289                            return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
290                        }
291
292                        let tokens = comp
293                            .tokens
294                            .iter()
295                            .flat_map(|addr| state_guard.tokens.get(addr).cloned())
296                            .collect::<Vec<_>>();
297
298                        if tokens.len() == comp.tokens.len() {
299                            Ok(Some((
300                                id.clone(),
301                                ProtocolComponent::from_with_tokens(comp.clone(), tokens),
302                            )))
303                        } else {
304                            Ok(None)
305                        }
306                    })
307                    .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
308                    )?
309                    .into_iter()
310                    .flatten()
311                    .collect();
312
313                // Remove components from state and add to removed_pairs
314                for (id, component) in removed_components {
315                    state_guard.components.remove(&id);
316                    state_guard.states.remove(&id);
317                    removed_pairs.insert(id, component);
318                }
319
320                // UPDATE VM STORAGE
321                info!(
322                    "Processing {} contracts from snapshots",
323                    protocol_msg
324                        .snapshots
325                        .get_vm_storage()
326                        .len()
327                );
328
329                let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
330                let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
331                for (key, value) in protocol_msg
332                    .snapshots
333                    .get_vm_storage()
334                    .iter()
335                {
336                    let account: ResponseAccount = value.clone().into();
337
338                    if state_guard.tokens.contains_key(key) {
339                        let original_address = account.address;
340                        // To work with Tycho's token overwrites system, if we get account
341                        // snapshots for a token we must handle them with a proxy/wrapper
342                        // contract.
343                        // Note: storage for the original contract must be set at the proxy
344                        // contract address. This is because the proxy contract uses
345                        // delegatecall to the original (implementation) contract.
346
347                        // Handle proxy token accounts
348                        let (impl_addr, proxy_state) = match state_guard
349                            .proxy_token_addresses
350                            .get(&original_address)
351                        {
352                            Some(impl_addr) => {
353                                // Token already has a proxy contract, simply update it.
354
355                                // Note: we apply the snapshot as an update. This is to cover the
356                                // case where a contract may be stale as it stopped being tracked
357                                // for some reason (e.g. due to a drop in tvl) and is now being
358                                // tracked again.
359                                let proxy_state = AccountUpdate::new(
360                                    original_address,
361                                    value.chain,
362                                    account.slots.clone(),
363                                    Some(account.native_balance),
364                                    None,
365                                    ChangeType::Update,
366                                );
367                                (*impl_addr, proxy_state)
368                            }
369                            None => {
370                                // Token does not have a proxy contract yet, create one
371
372                                // Assign original token contract to new address
373                                let impl_addr = generate_proxy_token_address(
374                                    state_guard.proxy_token_addresses.len() as u32,
375                                )?;
376                                state_guard
377                                    .proxy_token_addresses
378                                    .insert(original_address, impl_addr);
379
380                                // Add proxy token contract at original token address
381                                let proxy_state = create_proxy_token_account(
382                                    original_address,
383                                    Some(impl_addr),
384                                    &account.slots,
385                                    value.chain,
386                                    Some(account.native_balance),
387                                );
388
389                                (impl_addr, proxy_state)
390                            }
391                        };
392
393                        proxy_token_accounts.insert(original_address, proxy_state);
394
395                        // Assign original token contract to the implementation address
396                        let impl_update = ResponseAccount {
397                            address: impl_addr,
398                            slots: HashMap::new(),
399                            ..account.clone()
400                        };
401                        storage_by_address.insert(impl_addr, impl_update);
402                    } else {
403                        // Not a token, apply snapshot to the account at its original address
404                        storage_by_address.insert(account.address, account);
405                    }
406                }
407
408                // Split proxy accounts by change type:
409                // - Creation: new proxies that must overwrite any existing placeholder
410                // - Update: existing proxies whose storage is being refreshed (handled normally)
411                let mut proxy_creates: Vec<AccountUpdate> = Vec::new();
412                let mut proxy_updates: HashMap<Address, AccountUpdate> = HashMap::new();
413                for (addr, update) in proxy_token_accounts {
414                    if matches!(update.change, ChangeType::Creation) {
415                        proxy_creates.push(update);
416                    } else {
417                        proxy_updates.insert(addr, update);
418                    }
419                }
420
421                info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
422                update_engine(
423                    SHARED_TYCHO_DB.clone(),
424                    header.clone().block(),
425                    Some(storage_by_address),
426                    proxy_updates,
427                )
428                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
429
430                // Force-overwrite new proxy token accounts so that authoritative vm_storage data
431                // always wins over any empty placeholder previously inserted by another
432                // decoder's snapshot loop (which uses init_account / init-if-not-exists).
433                if !proxy_creates.is_empty() {
434                    SHARED_TYCHO_DB
435                        .force_update_accounts(proxy_creates)
436                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
437                }
438                info!("Engine updated");
439                drop(state_guard);
440            }
441
442            // Construct a contract to token balances map: HashMap<ContractAddress,
443            // HashMap<TokenAddress, Balance>>
444            let account_balances = protocol_msg
445                .clone()
446                .snapshots
447                .get_vm_storage()
448                .iter()
449                .filter_map(|(addr, acc)| {
450                    if acc.token_balances.is_empty() {
451                        return None;
452                    }
453                    let balances = acc
454                        .token_balances
455                        .iter()
456                        .map(|(token_addr, ab)| (token_addr.clone(), ab.balance.clone()))
457                        .collect::<HashMap<Bytes, Bytes>>();
458                    Some((addr.clone(), balances))
459                })
460                .collect::<AccountBalances>();
461
462            let mut new_components = HashMap::new();
463            let mut count_token_skips = 0;
464            let mut components_to_store = HashMap::new();
465            {
466                let state_guard = self.state.read().await;
467
468                // PROCESS SNAPSHOTS
469                'snapshot_loop: for (id, snapshot) in protocol_msg
470                    .snapshots
471                    .get_states()
472                    .clone()
473                {
474                    // Skip any unsupported pools
475                    if self
476                        .inclusion_filters
477                        .get(protocol.as_str())
478                        .is_some_and(|predicate| !predicate(&snapshot))
479                    {
480                        continue;
481                    }
482
483                    // Construct component from snapshot
484                    let mut component_tokens = Vec::new();
485                    let mut new_tokens_accounts = HashMap::new();
486                    for token in snapshot.component.tokens.clone() {
487                        match state_guard.tokens.get(&token) {
488                            Some(token) => {
489                                component_tokens.push(token.clone());
490
491                                // If the token is not an existing proxy token, we need to add it to
492                                // the simulation engine
493                                let token_address = match bytes_to_address(&token.address) {
494                                    Ok(addr) => addr,
495                                    Err(_) => {
496                                        count_token_skips += 1;
497                                        msg_failed_components.insert(id.clone());
498                                        warn!(
499                                            "Token address could not be decoded {}, ignoring pool {:x?}",
500                                            token.address, id
501                                        );
502                                        continue 'snapshot_loop;
503                                    }
504                                };
505                                // Deploy a proxy account without an implementation set
506                                if !state_guard
507                                    .proxy_token_addresses
508                                    .contains_key(&token_address)
509                                {
510                                    new_tokens_accounts.insert(
511                                        token_address,
512                                        create_proxy_token_account(
513                                            token_address,
514                                            None,
515                                            &HashMap::new(),
516                                            snapshot.component.chain,
517                                            None,
518                                        ),
519                                    );
520                                }
521                            }
522                            None => {
523                                count_token_skips += 1;
524                                msg_failed_components.insert(id.clone());
525                                debug!("Token not found {}, ignoring pool {:x?}", token, id);
526                                continue 'snapshot_loop;
527                            }
528                        }
529                    }
530                    let component = ProtocolComponent::from_with_tokens(
531                        snapshot.component.clone(),
532                        component_tokens,
533                    );
534
535                    // Add new tokens to the simulation engine
536                    if !new_tokens_accounts.is_empty() {
537                        update_engine(
538                            SHARED_TYCHO_DB.clone(),
539                            header.clone().block(),
540                            None,
541                            new_tokens_accounts,
542                        )
543                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
544                    }
545
546                    // collect contracts:ids mapping for states that should update on contract
547                    // changes (non-manual updates)
548                    if !component
549                        .static_attributes
550                        .contains_key("manual_updates")
551                    {
552                        for contract in &component.contract_ids {
553                            contracts_map
554                                .entry(contract.clone())
555                                .or_insert_with(HashSet::new)
556                                .insert(id.clone());
557                        }
558                        // Add DCI contracts so changes to these contracts trigger
559                        // an update
560                        for (_, tracing) in snapshot.entrypoints.iter() {
561                            for contract in tracing.accessed_slots.keys().cloned() {
562                                contracts_map
563                                    .entry(contract)
564                                    .or_insert_with(HashSet::new)
565                                    .insert(id.clone());
566                            }
567                        }
568                    }
569
570                    // Collect new pairs (components)
571                    new_pairs.insert(id.clone(), component.clone());
572
573                    // Store component for later batch insertion
574                    components_to_store.insert(id.clone(), component);
575
576                    // Construct state from snapshot
577                    if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
578                        match state_decode_f(
579                            snapshot,
580                            header.clone(),
581                            account_balances.clone(),
582                            self.state.clone(),
583                        )
584                        .await
585                        {
586                            Ok(state) => {
587                                new_components.insert(id.clone(), state);
588                            }
589                            Err(e) => {
590                                if self.skip_state_decode_failures {
591                                    warn!(pool = id, error = %e, "StateDecodingFailure");
592                                    msg_failed_components.insert(id.clone());
593                                    continue 'snapshot_loop;
594                                } else {
595                                    error!(pool = id, error = %e, "StateDecodingFailure");
596                                    return Err(StreamDecodeError::Fatal(format!("{e}")));
597                                }
598                            }
599                        }
600                    } else if self.skip_state_decode_failures {
601                        warn!(pool = id, "MissingDecoderRegistration");
602                        msg_failed_components.insert(id.clone());
603                        continue 'snapshot_loop;
604                    } else {
605                        error!(pool = id, "MissingDecoderRegistration");
606                        return Err(StreamDecodeError::Fatal(format!(
607                            "Missing decoder registration for: {id}"
608                        )));
609                    }
610                }
611            }
612
613            // Batch insert components into state
614            if !components_to_store.is_empty() {
615                let mut state_guard = self.state.write().await;
616                for (id, component) in components_to_store {
617                    state_guard
618                        .components
619                        .insert(id, component);
620                }
621            }
622
623            if !protocol_msg.snapshots.states.is_empty() {
624                info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
625            }
626            if count_token_skips > 0 {
627                info!("Skipped {count_token_skips} pools due to missing tokens");
628            }
629
630            //TODO: should we remove failed components for new_components?
631            updated_states.extend(new_components);
632
633            // PROCESS DELTAS
634            if let Some(deltas) = protocol_msg.deltas.clone() {
635                // Update engine with account changes
636                let mut state_guard = self.state.write().await;
637
638                let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
639                // New proxy token accounts that must overwrite any existing placeholder.
640                let mut new_proxy_accounts: Vec<AccountUpdate> = Vec::new();
641                for (key, value) in deltas.account_deltas.iter() {
642                    let mut update: AccountUpdate = value.clone().into();
643
644                    // TEMP PATCH (ENG-4993)
645                    //
646                    // The indexer may emit Creation deltas with no code for EOA addresses.
647                    // Treat them as EOAs (empty code) rather than downgrading to Update, which
648                    // would skip init_account and cause "uninitialized account" warnings.
649                    if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
650                        error!(
651                            update = ?update,
652                            "FaultyCreationDelta"
653                        );
654                        update.code = Some(vec![]);
655                    }
656
657                    if state_guard.tokens.contains_key(key) {
658                        let original_address = update.address;
659                        // If the account is a token, we need to handle it with a proxy contract.
660                        // Storage updates apply to the proxy contract (at original address).
661                        // Code updates (if any) apply to the token implementation contract (at
662                        // impl_addr).
663
664                        // Handle proxy contract updates
665                        let impl_addr = match state_guard
666                            .proxy_token_addresses
667                            .get(&original_address)
668                        {
669                            Some(impl_addr) => {
670                                // Token already has a proxy contract.
671
672                                // Apply the storage update to proxy contract
673                                let proxy_update = AccountUpdate { code: None, ..update.clone() };
674                                account_update_by_address.insert(original_address, proxy_update);
675
676                                *impl_addr
677                            }
678                            None => {
679                                // Token does not have a proxy contract yet, create one
680
681                                // Assign original token (implementation) contract to new proxy
682                                // address
683                                let impl_addr = generate_proxy_token_address(
684                                    state_guard.proxy_token_addresses.len() as u32,
685                                )?;
686                                state_guard
687                                    .proxy_token_addresses
688                                    .insert(original_address, impl_addr);
689
690                                // Create proxy token account with original account's storage (at
691                                // original address). Track it separately so it can be
692                                // force-overwritten and win over any placeholder that another
693                                // decoder's snapshot loop may have written earlier.
694                                let proxy_state = create_proxy_token_account(
695                                    original_address,
696                                    Some(impl_addr),
697                                    &update.slots,
698                                    update.chain,
699                                    update.balance,
700                                );
701                                new_proxy_accounts.push(proxy_state);
702
703                                impl_addr
704                            }
705                        };
706
707                        // Apply code update to token implementation contract
708                        if update.code.is_some() {
709                            let impl_update = AccountUpdate {
710                                address: impl_addr,
711                                slots: HashMap::new(),
712                                ..update.clone()
713                            };
714                            account_update_by_address.insert(impl_addr, impl_update);
715                        }
716                    } else {
717                        // Not a token, apply update to the account at its original address
718                        account_update_by_address.insert(update.address, update);
719                    }
720                }
721                drop(state_guard);
722
723                let state_guard = self.state.read().await;
724                info!("Updating engine with {} contract deltas", deltas.account_deltas.len());
725                update_engine(
726                    SHARED_TYCHO_DB.clone(),
727                    header.clone().block(),
728                    None,
729                    account_update_by_address,
730                )
731                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
732
733                // Force-overwrite any newly-created proxy token accounts so they always win
734                // over placeholder entries inserted by other decoders' snapshot loops.
735                if !new_proxy_accounts.is_empty() {
736                    SHARED_TYCHO_DB
737                        .force_update_accounts(new_proxy_accounts)
738                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
739                }
740                info!("Engine updated");
741
742                // Collect all pools related to the updated accounts
743                let mut pools_to_update = HashSet::new();
744                for (account, _update) in deltas.account_deltas {
745                    // get new pools related to the account updated
746                    pools_to_update.extend(
747                        contracts_map
748                            .get(&account)
749                            .cloned()
750                            .unwrap_or_default(),
751                    );
752                    // get existing pools related to the account updated
753                    pools_to_update.extend(
754                        state_guard
755                            .contracts_map
756                            .get(&account)
757                            .cloned()
758                            .unwrap_or_default(),
759                    );
760                }
761
762                // Collect all balance changes this block
763                let all_balances = Balances {
764                    component_balances: deltas
765                        .component_balances
766                        .iter()
767                        .map(|(pool_id, bals)| {
768                            let mut balances = HashMap::new();
769                            for (t, b) in bals {
770                                balances.insert(t.clone(), b.balance.clone());
771                            }
772                            pools_to_update.insert(pool_id.clone());
773                            (pool_id.clone(), balances)
774                        })
775                        .collect(),
776                    account_balances: deltas
777                        .account_balances
778                        .iter()
779                        .map(|(account, bals)| {
780                            let mut balances = HashMap::new();
781                            for (t, b) in bals {
782                                balances.insert(t.clone(), b.balance.clone());
783                            }
784                            pools_to_update.extend(
785                                contracts_map
786                                    .get(account)
787                                    .cloned()
788                                    .unwrap_or_default(),
789                            );
790                            (account.clone(), balances)
791                        })
792                        .collect(),
793                };
794
795                // update states with protocol state deltas (attribute changes etc.)
796                for (id, update) in deltas.state_deltas {
797                    // TODO: is this needed?
798                    let update_with_block = Self::add_block_info_to_delta(
799                        ProtocolStateDelta::from(update),
800                        current_block.clone(),
801                    );
802                    match Self::apply_update(
803                        &id,
804                        update_with_block,
805                        &mut updated_states,
806                        &state_guard,
807                        &all_balances,
808                    ) {
809                        Ok(_) => {
810                            pools_to_update.remove(&id);
811                        }
812                        Err(e) => {
813                            if self.skip_state_decode_failures {
814                                warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
815                                // Remove from updated_states if it was there
816                                updated_states.remove(&id);
817                                // Try to get component from new_pairs first, then from state
818                                if let Some(component) = new_pairs.remove(&id) {
819                                    removed_pairs.insert(id.clone(), component);
820                                } else if let Some(component) = state_guard.components.get(&id) {
821                                    removed_pairs.insert(id.clone(), component.clone());
822                                } else {
823                                    // Component not found in new_pairs or state, this shouldn't
824                                    // happen
825                                    warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
826                                }
827                                pools_to_update.remove(&id);
828
829                                // Add to failed components
830                                msg_failed_components.insert(id.clone());
831                            } else {
832                                return Err(e);
833                            }
834                        }
835                    }
836                }
837
838                // update remaining pools linked to updated contracts/updated balances
839                for pool in pools_to_update {
840                    // TODO: is this needed?
841                    let default_delta_with_block = Self::add_block_info_to_delta(
842                        ProtocolStateDelta::default(),
843                        current_block.clone(),
844                    );
845                    match Self::apply_update(
846                        &pool,
847                        default_delta_with_block,
848                        &mut updated_states,
849                        &state_guard,
850                        &all_balances,
851                    ) {
852                        Ok(_) => {}
853                        Err(e) => {
854                            if self.skip_state_decode_failures {
855                                warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
856                                // Remove from updated_states if it was there
857                                updated_states.remove(&pool);
858                                // Try to get component from new_pairs first, then from state
859                                if let Some(component) = new_pairs.remove(&pool) {
860                                    removed_pairs.insert(pool.clone(), component);
861                                } else if let Some(component) = state_guard.components.get(&pool) {
862                                    removed_pairs.insert(pool.clone(), component.clone());
863                                } else {
864                                    // Component not found in new_pairs or state, this shouldn't
865                                    // happen
866                                    warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
867                                }
868
869                                // Add to failed components
870                                msg_failed_components.insert(pool.clone());
871                            } else {
872                                return Err(e);
873                            }
874                        }
875                    }
876                }
877            };
878        }
879
880        // Persist the newly added/updated states
881        let mut state_guard = self.state.write().await;
882
883        // Update failed components with any new ones
884        state_guard
885            .failed_components
886            .extend(msg_failed_components);
887
888        // Remove any failed components from Updates
889        // Perf: we could do it directly in the decoder logic to avoid some steps, but this logic is
890        // complex and this is more robust.
891        updated_states.retain(|id, _| {
892            !state_guard
893                .failed_components
894                .contains(id)
895        });
896        new_pairs.retain(|id, _| {
897            !state_guard
898                .failed_components
899                .contains(id)
900        });
901
902        state_guard
903            .states
904            .extend(updated_states.clone());
905
906        state_guard.current_block_number = block_number_or_timestamp;
907
908        // Add new components to persistent state
909        for (id, component) in new_pairs.iter() {
910            state_guard
911                .components
912                .insert(id.clone(), component.clone());
913        }
914
915        // Remove components from persistent state
916        for id in removed_pairs.keys() {
917            state_guard.components.remove(id);
918        }
919
920        for (key, values) in contracts_map {
921            state_guard
922                .contracts_map
923                .entry(key)
924                .or_insert_with(HashSet::new)
925                .extend(values);
926        }
927
928        // Send the tick with all updated states
929        Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
930            .set_is_partial(is_partial)
931            .set_removed_pairs(removed_pairs)
932            .set_sync_states(msg.sync_states.clone()))
933    }
934
935    /// Applies pending deltas from one or more `TxDeltaIndexer`s against the current confirmed
936    /// state and returns an ephemeral `Update`.
937    ///
938    /// This is the read-only counterpart of `decode()`. It clones pool states, applies the
939    /// supplied `pending_deltas`, and returns the result — **without writing back** to
940    /// `DecoderState`. Calling this method twice with the same input produces identical results.
941    ///
942    /// Only native protocols are supported. VM protocols (extractor prefix `"vm:"`) are rejected
943    /// at registration time in
944    /// [`with_pending_indexer`](crate::evm::stream::ProtocolStreamBuilder::with_pending_indexer).
945    ///
946    /// # Parameters
947    /// * `pending_deltas` — map from extractor name to the `BlockAggregatedChanges` produced by the
948    ///   corresponding `TxDeltaIndexer::generate_deltas()` call.
949    /// * `header` — the target block header. Its `block_number_or_timestamp()` is stamped on the
950    ///   returned [`Update`]; its `block_number` and `block_timestamp` are injected into each state
951    ///   delta so that protocols relying on block context (e.g. aerodrome slipstreams, etherfi)
952    ///   receive correct values.
953    pub async fn apply_deltas_ephemeral(
954        &self,
955        pending_deltas: &HashMap<String, BlockAggregatedChanges>,
956        header: H,
957    ) -> Result<Update, StreamDecodeError> {
958        let block_number_or_timestamp = header
959            .clone()
960            .block_number_or_timestamp();
961        let current_block = header.block();
962        let state_guard = self.state.read().await;
963
964        let mut updated_states: HashMap<String, Box<dyn ProtocolSim>> = HashMap::new();
965
966        for deltas in pending_deltas.values() {
967            let all_balances = Balances {
968                component_balances: deltas
969                    .component_balances
970                    .iter()
971                    .map(|(pool_id, bals)| {
972                        let balances = bals
973                            .iter()
974                            .map(|(t, b)| (t.clone(), b.balance.clone()))
975                            .collect();
976                        (pool_id.clone(), balances)
977                    })
978                    .collect(),
979                account_balances: HashMap::new(),
980            };
981
982            for (id, state_delta) in &deltas.state_deltas {
983                let dto_delta = Self::add_block_info_to_delta(
984                    ProtocolStateDelta::from(state_delta.clone()),
985                    current_block.clone(),
986                );
987                if let Err(e) = Self::apply_update(
988                    id,
989                    dto_delta,
990                    &mut updated_states,
991                    &state_guard,
992                    &all_balances,
993                ) {
994                    warn!(pool = id, error = %e, "EphemeralDeltaTransitionError");
995                }
996            }
997        }
998
999        Ok(Update::new(block_number_or_timestamp, updated_states, HashMap::new()))
1000    }
1001
1002    /// Add current block information (number and timestamp) to a ProtocolStateDelta.
1003    fn add_block_info_to_delta(
1004        mut delta: ProtocolStateDelta,
1005        block_header_opt: Option<BlockHeader>,
1006    ) -> ProtocolStateDelta {
1007        if let Some(header) = block_header_opt {
1008            // Add block_number and block_timestamp attributes to ensure pool states
1009            // receive current block information during delta_transition
1010            delta.updated_attributes.insert(
1011                "block_number".to_string(),
1012                Bytes::from(header.number.to_be_bytes().to_vec()),
1013            );
1014            delta.updated_attributes.insert(
1015                "block_timestamp".to_string(),
1016                Bytes::from(header.timestamp.to_be_bytes().to_vec()),
1017            );
1018        }
1019        delta
1020    }
1021
1022    fn apply_update(
1023        id: &String,
1024        update: ProtocolStateDelta,
1025        updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
1026        state_guard: &RwLockReadGuard<'_, DecoderState>,
1027        all_balances: &Balances,
1028    ) -> Result<(), StreamDecodeError> {
1029        match updated_states.entry(id.clone()) {
1030            Entry::Occupied(mut entry) => {
1031                // If state exists in updated_states, apply the delta to it
1032                let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
1033                state
1034                    .delta_transition(update, &state_guard.tokens, all_balances)
1035                    .map_err(|e| {
1036                        error!(pool = id, error = ?e, "DeltaTransitionError");
1037                        StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
1038                    })?;
1039            }
1040            Entry::Vacant(_) => {
1041                match state_guard.states.get(id) {
1042                    // If state does not exist in updated_states, apply the delta to the stored
1043                    // state
1044                    Some(stored_state) => {
1045                        let mut state = stored_state.clone();
1046                        state
1047                            .delta_transition(update, &state_guard.tokens, all_balances)
1048                            .map_err(|e| {
1049                                error!(pool = id, error = ?e, "DeltaTransitionError");
1050                                StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
1051                            })?;
1052                        updated_states.insert(id.clone(), state);
1053                    }
1054                    None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
1055                }
1056            }
1057        }
1058        Ok(())
1059    }
1060}
1061
1062/// Generate a proxy token address for a given token index
1063fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
1064    let padded_idx = format!("{idx:x}");
1065    let padded_zeroes = "0".repeat(33 - padded_idx.len());
1066    let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
1067    let decoded = hex::decode(proxy_token_address).map_err(|e| {
1068        StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
1069    })?;
1070
1071    const ADDRESS_LENGTH: usize = 20;
1072    if decoded.len() != ADDRESS_LENGTH {
1073        return Err(StreamDecodeError::Fatal(format!(
1074            "Invalid proxy token address length: expected {}, got {}",
1075            ADDRESS_LENGTH,
1076            decoded.len(),
1077        )));
1078    }
1079
1080    Ok(Address::from_slice(&decoded))
1081}
1082
1083/// Create a proxy token account for a token at a given address
1084///
1085/// The proxy token account is created at the original token address and points to the new token
1086/// address.
1087fn create_proxy_token_account(
1088    addr: Address,
1089    new_address: Option<Address>,
1090    storage: &HashMap<U256, U256>,
1091    chain: Chain,
1092    balance: Option<U256>,
1093) -> AccountUpdate {
1094    let mut slots = storage.clone();
1095    if let Some(new_address) = new_address {
1096        slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
1097    }
1098
1099    AccountUpdate {
1100        address: addr,
1101        chain,
1102        slots,
1103        balance,
1104        code: Some(ERC20_PROXY_BYTECODE.to_vec()),
1105        change: ChangeType::Creation,
1106    }
1107}
1108
1109#[cfg(test)]
1110mock! {
1111    #[derive(Debug)]
1112    pub ProtocolSim {
1113        pub fn fee(&self) -> f64;
1114        pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
1115        pub fn get_amount_out(
1116            &self,
1117            amount_in: BigUint,
1118            token_in: &Token,
1119            token_out: &Token,
1120        ) -> Result<GetAmountOutResult, SimulationError>;
1121        pub fn get_limits(
1122            &self,
1123            sell_token: Bytes,
1124            buy_token: Bytes,
1125        ) -> Result<(BigUint, BigUint), SimulationError>;
1126        pub fn delta_transition(
1127            &mut self,
1128            delta: ProtocolStateDelta,
1129            tokens: &HashMap<Bytes, Token>,
1130            balances: &Balances,
1131        ) -> Result<(), TransitionError>;
1132        pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1133        pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1134    }
1135}
1136
1137#[cfg(test)]
1138crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1139
1140#[cfg(test)]
1141impl ProtocolSim for MockProtocolSim {
1142    fn fee(&self) -> f64 {
1143        self.fee()
1144    }
1145
1146    fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1147        self.spot_price(base, quote)
1148    }
1149
1150    fn get_amount_out(
1151        &self,
1152        amount_in: BigUint,
1153        token_in: &Token,
1154        token_out: &Token,
1155    ) -> Result<GetAmountOutResult, SimulationError> {
1156        self.get_amount_out(amount_in, token_in, token_out)
1157    }
1158
1159    fn get_limits(
1160        &self,
1161        sell_token: Bytes,
1162        buy_token: Bytes,
1163    ) -> Result<(BigUint, BigUint), SimulationError> {
1164        self.get_limits(sell_token, buy_token)
1165    }
1166
1167    fn delta_transition(
1168        &mut self,
1169        delta: ProtocolStateDelta,
1170        tokens: &HashMap<Bytes, Token>,
1171        balances: &Balances,
1172    ) -> Result<(), TransitionError> {
1173        self.delta_transition(delta, tokens, balances)
1174    }
1175
1176    fn clone_box(&self) -> Box<dyn ProtocolSim> {
1177        self.clone_box()
1178    }
1179
1180    fn as_any(&self) -> &dyn Any {
1181        panic!("MockProtocolSim does not support as_any")
1182    }
1183
1184    fn as_any_mut(&mut self) -> &mut dyn Any {
1185        panic!("MockProtocolSim does not support as_any_mut")
1186    }
1187
1188    fn eq(&self, other: &dyn ProtocolSim) -> bool {
1189        self.eq(other)
1190    }
1191
1192    fn typetag_name(&self) -> &'static str {
1193        unreachable!()
1194    }
1195
1196    fn typetag_deserialize(&self) {
1197        unreachable!()
1198    }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use std::str::FromStr;
1204
1205    use alloy::primitives::address;
1206    use mockall::predicate::*;
1207    use rstest::*;
1208    use tycho_client::feed::BlockHeader;
1209    use tycho_common::{models::Chain, Bytes};
1210
1211    use super::*;
1212    use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1213
1214    async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1215        let mut decoder = TychoStreamDecoder::new();
1216        decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1217        if set_tokens {
1218            let tokens = [
1219                Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1220                Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1221            ]
1222            .iter()
1223            .map(|addr| {
1224                let addr_str = format!("{addr:x}");
1225                (
1226                    addr.clone(),
1227                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1228                )
1229            })
1230            .collect();
1231            decoder.set_tokens(tokens).await;
1232        }
1233        decoder
1234    }
1235
1236    fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1237        use std::{fs, path::Path};
1238
1239        use tycho_client::feed::dto;
1240        let project_root = env!("CARGO_MANIFEST_DIR");
1241        let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1242        let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1243        let feed_msg: dto::FeedMessage<BlockHeader> =
1244            serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!");
1245        FeedMessage::from(feed_msg)
1246    }
1247
1248    #[tokio::test]
1249    async fn test_decode() {
1250        let decoder = setup_decoder(true).await;
1251
1252        let msg = load_test_msg("uniswap_v2_snapshot");
1253        let res1 = decoder
1254            .decode(&msg)
1255            .await
1256            .expect("decode failure");
1257        let msg = load_test_msg("uniswap_v2_delta");
1258        let res2 = decoder
1259            .decode(&msg)
1260            .await
1261            .expect("decode failure");
1262
1263        assert_eq!(res1.states.len(), 1);
1264        assert_eq!(res2.states.len(), 1);
1265        assert_eq!(res1.sync_states.len(), 1);
1266        assert_eq!(res2.sync_states.len(), 1);
1267    }
1268
1269    #[tokio::test]
1270    async fn test_decode_component_missing_token() {
1271        let decoder = setup_decoder(false).await;
1272        let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1273            .iter()
1274            .map(|addr| {
1275                let addr_str = format!("{addr:x}");
1276                (
1277                    addr.clone(),
1278                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1279                )
1280            })
1281            .collect();
1282        decoder.set_tokens(tokens).await;
1283
1284        let msg = load_test_msg("uniswap_v2_snapshot");
1285        let res1 = decoder
1286            .decode(&msg)
1287            .await
1288            .expect("decode failure");
1289
1290        assert_eq!(res1.states.len(), 0);
1291    }
1292
1293    #[tokio::test]
1294    async fn test_decode_component_bad_id() {
1295        let decoder = setup_decoder(true).await;
1296        let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1297
1298        match decoder.decode(&msg).await {
1299            Err(StreamDecodeError::Fatal(msg)) => {
1300                assert_eq!(msg, "Component id mismatch");
1301            }
1302            Ok(_) => {
1303                panic!("Expected failures to be raised")
1304            }
1305        }
1306    }
1307
1308    #[rstest]
1309    #[case(true)]
1310    #[case(false)]
1311    #[tokio::test]
1312    async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1313        let mut decoder = setup_decoder(true).await;
1314        decoder.skip_state_decode_failures = skip_failures;
1315
1316        let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1317        match decoder.decode(&msg).await {
1318            Err(StreamDecodeError::Fatal(msg)) => {
1319                if !skip_failures {
1320                    assert_eq!(msg, "Missing attributes reserve0");
1321                } else {
1322                    panic!("Expected failures to be ignored. Err: {msg}")
1323                }
1324            }
1325            Ok(res) => {
1326                if !skip_failures {
1327                    panic!("Expected failures to be raised")
1328                } else {
1329                    assert_eq!(res.states.len(), 0);
1330                }
1331            }
1332        }
1333    }
1334
1335    #[tokio::test]
1336    async fn test_decode_updates_state_on_contract_change() {
1337        let decoder = setup_decoder(true).await;
1338
1339        // Create the mock instances
1340        let mut mock_state = MockProtocolSim::new();
1341
1342        mock_state
1343            .expect_clone_box()
1344            .times(1)
1345            .returning(|| {
1346                let mut cloned_mock_state = MockProtocolSim::new();
1347                // Expect `delta_transition` to be called once with any parameters
1348                cloned_mock_state
1349                    .expect_delta_transition()
1350                    .times(1)
1351                    .returning(|_, _, _| Ok(()));
1352                cloned_mock_state
1353                    .expect_clone_box()
1354                    .times(1)
1355                    .returning(|| Box::new(MockProtocolSim::new()));
1356                Box::new(cloned_mock_state)
1357            });
1358
1359        // Insert mock state into `updated_states`
1360        let pool_id =
1361            "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1362        decoder
1363            .state
1364            .write()
1365            .await
1366            .states
1367            .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1368        decoder
1369            .state
1370            .write()
1371            .await
1372            .contracts_map
1373            .insert(
1374                Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1375                HashSet::from([pool_id.clone()]),
1376            );
1377
1378        // Load a test message containing a contract update
1379        let msg = load_test_msg("balancer_v2_delta");
1380
1381        // Decode the message
1382        let _ = decoder
1383            .decode(&msg)
1384            .await
1385            .expect("decode failure");
1386
1387        // The mock framework will assert that `delta_transition` was called exactly once
1388    }
1389
1390    #[test]
1391    fn test_generate_proxy_token_address() {
1392        let idx = 1;
1393        let generated_address =
1394            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1395        assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1396
1397        let idx = 123456;
1398        let generated_address =
1399            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1400        assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1401    }
1402
1403    #[tokio::test(flavor = "multi_thread")]
1404    async fn test_euler_hook_low_pool_manager_balance() {
1405        let mut decoder = TychoStreamDecoder::new();
1406
1407        decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1408            "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1409        );
1410
1411        let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1412        let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1413        let tokens = HashMap::from([
1414            (
1415                weth.clone(),
1416                Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1417            ),
1418            (
1419                teth.clone(),
1420                Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1421            ),
1422        ]);
1423
1424        decoder.set_tokens(tokens.clone()).await;
1425
1426        let msg = load_test_msg("euler_hook_snapshot");
1427        let res = decoder
1428            .decode(&msg)
1429            .await
1430            .expect("decode failure");
1431
1432        let pool_state = res
1433            .states
1434            .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1435            .expect("Couldn't find target pool");
1436        let amount_out = pool_state
1437            .get_amount_out(
1438                BigUint::from_str("1000000000000000000").unwrap(),
1439                tokens.get(&teth).unwrap(),
1440                tokens.get(&weth).unwrap(),
1441            )
1442            .expect("Get amount out failed");
1443
1444        assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1445    }
1446}