tycho_simulation/evm/
decoder.rs

1use std::{
2    collections::{hash_map::Entry, HashMap, HashSet},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14    dto::{ChangeType, ProtocolStateDelta},
15    models::{token::Token, Chain},
16    simulation::protocol_sim::{Balances, ProtocolSim},
17    Bytes,
18};
19#[cfg(test)]
20use {
21    mockall::mock,
22    num_bigint::BigUint,
23    std::any::Any,
24    tycho_common::simulation::{
25        errors::{SimulationError, TransitionError},
26        protocol_sim::GetAmountOutResult,
27    },
28};
29
30use crate::{
31    evm::{
32        engine_db::{update_engine, SHARED_TYCHO_DB},
33        protocol::{
34            utils::bytes_to_address,
35            vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36        },
37        tycho_models::{AccountUpdate, ResponseAccount},
38    },
39    protocol::{
40        errors::InvalidSnapshotError,
41        models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42    },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47    #[error("{0}")]
48    Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53    tokens: HashMap<Bytes, Token>,
54    states: HashMap<String, Box<dyn ProtocolSim>>,
55    components: HashMap<String, ProtocolComponent>,
56    // maps contract address to the pools they affect
57    contracts_map: HashMap<Bytes, HashSet<String>>,
58    // Maps original token address to their new proxy token address
59    proxy_token_addresses: HashMap<Address, Address>,
60    // Set of failed components, these are components that failed to decode and will not be emitted
61    // again TODO: handle more gracefully inside tycho-client. We could fetch the snapshot and
62    // try to decode it again.
63    failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67    Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70    + Send
71    + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74/// A decoder to process raw messages.
75///
76/// This struct decodes incoming messages of type `FeedMessage` and converts it into the
77/// `BlockUpdate` struct.
78///
79/// # Important:
80/// - Supports registering exchanges and their associated filters for specific protocol components.
81/// - Allows the addition of client-side filters for custom conditions.
82///
83/// **Note:** The tokens provided during configuration will be used for decoding, ensuring
84/// efficient handling of protocol components. Protocol components containing tokens which are not
85/// included in this initial list, or added when applying deltas, will not be decoded.
86pub struct TychoStreamDecoder<H>
87where
88    H: HeaderLike,
89{
90    state: Arc<RwLock<DecoderState>>,
91    skip_state_decode_failures: bool,
92    min_token_quality: u32,
93    registry: HashMap<String, Box<RegistryFn<H>>>,
94    inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110    pub fn new() -> Self {
111        Self {
112            state: Arc::new(RwLock::new(DecoderState::default())),
113            skip_state_decode_failures: false,
114            min_token_quality: 51,
115            registry: HashMap::new(),
116            inclusion_filters: HashMap::new(),
117        }
118    }
119
120    /// Sets the currently known tokens which will be considered during decoding.
121    ///
122    /// Protocol components containing tokens which are not included in this initial list, or
123    /// added when applying deltas, will not be decoded.
124    pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125        let mut guard = self.state.write().await;
126        guard.tokens = tokens;
127    }
128
129    pub fn skip_state_decode_failures(&mut self, skip: bool) {
130        self.skip_state_decode_failures = skip;
131    }
132
133    /// Registers a decoder for a given exchange with a decoder context.
134    ///
135    /// This method maps an exchange identifier to a specific protocol simulation type.
136    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
137    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
138    /// the component data into the appropriate protocol simulation type based on the current
139    /// blockchain state and the provided block header.
140    /// For example, to register a decoder for the `uniswap_v2` exchange with an additional decoder
141    /// context, you must call this function with
142    /// `register_decoder_with_context::<UniswapV2State>("uniswap_v2", context)`.
143    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
144    /// `UniswapV2State` decoder for use in the protocol stream.
145    pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
146    where
147        T: ProtocolSim
148            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
149            + Send
150            + 'static,
151    {
152        let decoder = Box::new(
153            move |component: ComponentWithState,
154                  header: H,
155                  account_balances: AccountBalances,
156                  state: Arc<RwLock<DecoderState>>| {
157                let context = context.clone();
158                Box::pin(async move {
159                    let guard = state.read().await;
160                    T::try_from_with_header(
161                        component,
162                        header,
163                        &account_balances,
164                        &guard.tokens,
165                        &context,
166                    )
167                    .await
168                    .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
169                }) as DecodeFut
170            },
171        );
172        self.registry
173            .insert(exchange.to_string(), decoder);
174    }
175
176    /// Registers a decoder for a given exchange.
177    ///
178    /// This method maps an exchange identifier to a specific protocol simulation type.
179    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
180    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
181    /// the component data into the appropriate protocol simulation type based on the current
182    /// blockchain state and the provided block header.
183    /// For example, to register a decoder for the `uniswap_v2` exchange, you must call
184    /// this function with `register_decoder::<UniswapV2State>("uniswap_v2", vm_attributes)`.
185    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
186    /// `UniswapV2State` decoder for use in the protocol stream.
187    pub fn register_decoder<T>(&mut self, exchange: &str)
188    where
189        T: ProtocolSim
190            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
191            + Send
192            + 'static,
193    {
194        let context = DecoderContext::new();
195        self.register_decoder_with_context::<T>(exchange, context);
196    }
197
198    /// Registers a client-side filter function for a given exchange.
199    ///
200    /// Associates a filter function with an exchange ID, enabling custom filtering of protocol
201    /// components. The filter function is applied client-side to refine the data received from the
202    /// stream. It can be used to exclude certain components based on attributes or conditions that
203    /// are not supported by the server-side filtering logic. This is particularly useful for
204    /// implementing custom behaviors, such as:
205    /// - Filtering out pools with specific attributes (e.g., unsupported features).
206    /// - Blacklisting pools based on custom criteria.
207    /// - Excluding pools that do not meet certain requirements (e.g., token pairs or liquidity
208    ///   constraints).
209    ///
210    /// For example, you might use a filter to exclude pools that are not fully supported in the
211    /// protocol, or to ignore pools with certain attributes that are irrelevant to your
212    /// application.
213    pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
214        self.inclusion_filters
215            .insert(exchange.to_string(), predicate);
216    }
217
218    /// Decodes a `FeedMessage` into a `BlockUpdate` containing the updated states of protocol
219    /// components
220    pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
221        // stores all states updated in this tick/msg
222        let mut updated_states = HashMap::new();
223        let mut new_pairs = HashMap::new();
224        let mut removed_pairs = HashMap::new();
225        let mut contracts_map = HashMap::new();
226        let mut msg_failed_components = HashSet::new();
227
228        let header = msg
229            .state_msgs
230            .values()
231            .next()
232            .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
233            .header
234            .clone();
235
236        let block_number_or_timestamp = header
237            .clone()
238            .block_number_or_timestamp();
239        let current_block = header.clone().block();
240
241        for (protocol, protocol_msg) in msg.state_msgs.iter() {
242            // Add any new tokens
243            if let Some(deltas) = protocol_msg.deltas.as_ref() {
244                let mut state_guard = self.state.write().await;
245
246                let new_tokens = deltas
247                    .new_tokens
248                    .iter()
249                    .filter(|(addr, t)| {
250                        t.quality >= self.min_token_quality &&
251                            !state_guard.tokens.contains_key(*addr)
252                    })
253                    .filter_map(|(addr, t)| {
254                        t.clone()
255                            .try_into()
256                            .map(|token| (addr.clone(), token))
257                            .inspect_err(|e| {
258                                warn!("Failed decoding token {e:?} {addr:#044x}");
259                                *e
260                            })
261                            .ok()
262                    })
263                    .collect::<HashMap<Bytes, Token>>();
264
265                if !new_tokens.is_empty() {
266                    debug!(n = new_tokens.len(), "NewTokens");
267                    state_guard.tokens.extend(new_tokens);
268                }
269            }
270
271            // Remove untracked components
272            {
273                let mut state_guard = self.state.write().await;
274                let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
275                    .removed_components
276                    .iter()
277                    .map(|(id, comp)| {
278                        if *id != comp.id {
279                            error!(
280                                "Component id mismatch in removed components {id} != {}",
281                                comp.id
282                            );
283                            return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
284                        }
285
286                        let tokens = comp
287                            .tokens
288                            .iter()
289                            .flat_map(|addr| state_guard.tokens.get(addr).cloned())
290                            .collect::<Vec<_>>();
291
292                        if tokens.len() == comp.tokens.len() {
293                            Ok(Some((
294                                id.clone(),
295                                ProtocolComponent::from_with_tokens(comp.clone(), tokens),
296                            )))
297                        } else {
298                            Ok(None)
299                        }
300                    })
301                    .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
302                    )?
303                    .into_iter()
304                    .flatten()
305                    .collect();
306
307                // Remove components from state and add to removed_pairs
308                for (id, component) in removed_components {
309                    state_guard.components.remove(&id);
310                    state_guard.states.remove(&id);
311                    removed_pairs.insert(id, component);
312                }
313
314                // UPDATE VM STORAGE
315                info!(
316                    "Processing {} contracts from snapshots",
317                    protocol_msg
318                        .snapshots
319                        .get_vm_storage()
320                        .len()
321                );
322
323                let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
324                let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
325                for (key, value) in protocol_msg
326                    .snapshots
327                    .get_vm_storage()
328                    .iter()
329                {
330                    let account: ResponseAccount = value.clone().into();
331
332                    if state_guard.tokens.contains_key(key) {
333                        let original_address = account.address;
334                        // To work with Tycho's token overwrites system, if we get account
335                        // snapshots for a token we must handle them with a proxy/wrapper
336                        // contract.
337                        // Note: storage for the original contract must be set at the proxy
338                        // contract address. This is because the proxy contract uses
339                        // delegatecall to the original (implementation) contract.
340
341                        // Handle proxy token accounts
342                        let (impl_addr, proxy_state) = match state_guard
343                            .proxy_token_addresses
344                            .get(&original_address)
345                        {
346                            Some(impl_addr) => {
347                                // Token already has a proxy contract, simply update it.
348
349                                // Note: we apply the snapshot as an update. This is to cover the
350                                // case where a contract may be stale as it stopped being tracked
351                                // for some reason (e.g. due to a drop in tvl) and is now being
352                                // tracked again.
353                                let proxy_state = AccountUpdate::new(
354                                    original_address,
355                                    value.chain.into(),
356                                    account.slots.clone(),
357                                    Some(account.native_balance),
358                                    None,
359                                    ChangeType::Update,
360                                );
361                                (*impl_addr, proxy_state)
362                            }
363                            None => {
364                                // Token does not have a proxy contract yet, create one
365
366                                // Assign original token contract to new address
367                                let impl_addr = generate_proxy_token_address(
368                                    state_guard.proxy_token_addresses.len() as u32,
369                                )?;
370                                state_guard
371                                    .proxy_token_addresses
372                                    .insert(original_address, impl_addr);
373
374                                // Add proxy token contract at original token address
375                                let proxy_state = create_proxy_token_account(
376                                    original_address,
377                                    Some(impl_addr),
378                                    &account.slots,
379                                    value.chain.into(),
380                                    Some(account.native_balance),
381                                );
382
383                                (impl_addr, proxy_state)
384                            }
385                        };
386
387                        proxy_token_accounts.insert(original_address, proxy_state);
388
389                        // Assign original token contract to the implementation address
390                        let impl_update = ResponseAccount {
391                            address: impl_addr,
392                            slots: HashMap::new(),
393                            ..account.clone()
394                        };
395                        storage_by_address.insert(impl_addr, impl_update);
396                    } else {
397                        // Not a token, apply snapshot to the account at its original address
398                        storage_by_address.insert(account.address, account);
399                    }
400                }
401
402                info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
403                update_engine(
404                    SHARED_TYCHO_DB.clone(),
405                    header.clone().block(),
406                    Some(storage_by_address),
407                    proxy_token_accounts,
408                )
409                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
410                info!("Engine updated");
411                drop(state_guard);
412            }
413
414            // Construct a contract to token balances map: HashMap<ContractAddress,
415            // HashMap<TokenAddress, Balance>>
416            let account_balances = protocol_msg
417                .clone()
418                .snapshots
419                .get_vm_storage()
420                .iter()
421                .filter_map(|(addr, acc)| {
422                    let balances = acc.token_balances.clone();
423                    if balances.is_empty() {
424                        return None;
425                    }
426                    Some((addr.clone(), balances))
427                })
428                .collect::<AccountBalances>();
429
430            let mut new_components = HashMap::new();
431            let mut count_token_skips = 0;
432            let mut components_to_store = HashMap::new();
433            {
434                let state_guard = self.state.read().await;
435
436                // PROCESS SNAPSHOTS
437                'snapshot_loop: for (id, snapshot) in protocol_msg
438                    .snapshots
439                    .get_states()
440                    .clone()
441                {
442                    // Skip any unsupported pools
443                    if self
444                        .inclusion_filters
445                        .get(protocol.as_str())
446                        .is_some_and(|predicate| !predicate(&snapshot))
447                    {
448                        continue;
449                    }
450
451                    // Construct component from snapshot
452                    let mut component_tokens = Vec::new();
453                    let mut new_tokens_accounts = HashMap::new();
454                    for token in snapshot.component.tokens.clone() {
455                        match state_guard.tokens.get(&token) {
456                            Some(token) => {
457                                component_tokens.push(token.clone());
458
459                                // If the token is not an existing proxy token, we need to add it to
460                                // the simulation engine
461                                let token_address = match bytes_to_address(&token.address) {
462                                    Ok(addr) => addr,
463                                    Err(_) => {
464                                        warn!(
465                                            "Token address could not be decoded {}, ignoring pool {:x?}",
466                                            token.address, id
467                                        );
468                                        continue 'snapshot_loop;
469                                    }
470                                };
471                                // Deploy a proxy account without an implementation set
472                                if !state_guard
473                                    .proxy_token_addresses
474                                    .contains_key(&token_address)
475                                {
476                                    new_tokens_accounts.insert(
477                                        token_address,
478                                        create_proxy_token_account(
479                                            token_address,
480                                            None,
481                                            &HashMap::new(),
482                                            snapshot.component.chain.into(),
483                                            None,
484                                        ),
485                                    );
486                                }
487                            }
488                            None => {
489                                count_token_skips += 1;
490                                debug!("Token not found {}, ignoring pool {:x?}", token, id);
491                                continue 'snapshot_loop;
492                            }
493                        }
494                    }
495                    let component = ProtocolComponent::from_with_tokens(
496                        snapshot.component.clone(),
497                        component_tokens,
498                    );
499
500                    // Add new tokens to the simulation engine
501                    if !new_tokens_accounts.is_empty() {
502                        update_engine(
503                            SHARED_TYCHO_DB.clone(),
504                            header.clone().block(),
505                            None,
506                            new_tokens_accounts,
507                        )
508                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
509                    }
510
511                    // collect contracts:ids mapping for states that should update on contract
512                    // changes (non-manual updates)
513                    if !component
514                        .static_attributes
515                        .contains_key("manual_updates")
516                    {
517                        for contract in &component.contract_ids {
518                            contracts_map
519                                .entry(contract.clone())
520                                .or_insert_with(HashSet::new)
521                                .insert(id.clone());
522                        }
523                        // Add DCI contracts so changes to these contracts trigger
524                        // an update
525                        for (_, tracing) in snapshot.entrypoints.iter() {
526                            for contract in tracing.accessed_slots.keys().cloned() {
527                                contracts_map
528                                    .entry(contract)
529                                    .or_insert_with(HashSet::new)
530                                    .insert(id.clone());
531                            }
532                        }
533                    }
534
535                    // Collect new pairs (components)
536                    new_pairs.insert(id.clone(), component.clone());
537
538                    // Store component for later batch insertion
539                    components_to_store.insert(id.clone(), component);
540
541                    // Construct state from snapshot
542                    if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
543                        match state_decode_f(
544                            snapshot,
545                            header.clone(),
546                            account_balances.clone(),
547                            self.state.clone(),
548                        )
549                        .await
550                        {
551                            Ok(state) => {
552                                new_components.insert(id.clone(), state);
553                            }
554                            Err(e) => {
555                                if self.skip_state_decode_failures {
556                                    warn!(pool = id, error = %e, "StateDecodingFailure");
557                                    msg_failed_components.insert(id.clone());
558                                    continue 'snapshot_loop;
559                                } else {
560                                    error!(pool = id, error = %e, "StateDecodingFailure");
561                                    return Err(StreamDecodeError::Fatal(format!("{e}")));
562                                }
563                            }
564                        }
565                    } else if self.skip_state_decode_failures {
566                        warn!(pool = id, "MissingDecoderRegistration");
567                        msg_failed_components.insert(id.clone());
568                        continue 'snapshot_loop;
569                    } else {
570                        error!(pool = id, "MissingDecoderRegistration");
571                        return Err(StreamDecodeError::Fatal(format!(
572                            "Missing decoder registration for: {id}"
573                        )));
574                    }
575                }
576            }
577
578            // Batch insert components into state
579            if !components_to_store.is_empty() {
580                let mut state_guard = self.state.write().await;
581                for (id, component) in components_to_store {
582                    state_guard
583                        .components
584                        .insert(id, component);
585                }
586            }
587
588            if !protocol_msg.snapshots.states.is_empty() {
589                info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
590            }
591            if count_token_skips > 0 {
592                info!("Skipped {count_token_skips} pools due to missing tokens");
593            }
594
595            //TODO: should we remove failed components for new_components?
596            updated_states.extend(new_components);
597
598            // PROCESS DELTAS
599            if let Some(deltas) = protocol_msg.deltas.clone() {
600                // Update engine with account changes
601                let mut state_guard = self.state.write().await;
602
603                let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
604                for (key, value) in deltas.account_updates.iter() {
605                    let mut update: AccountUpdate = value.clone().into();
606
607                    // TEMP PATCH (ENG-4993)
608                    //
609                    // The indexer emits deltas without code marked as creations, which crashes
610                    // TychoDB. Until fixed, treat them as updates (since EVM code cannot be
611                    // deleted).
612                    if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
613                        error!(
614                            update = ?update,
615                            "FaultyCreationDelta"
616                        );
617                        update.change = ChangeType::Update;
618                    }
619
620                    if state_guard.tokens.contains_key(key) {
621                        let original_address = update.address;
622                        // If the account is a token, we need to handle it with a proxy contract.
623                        // Storage updates apply to the proxy contract (at original address).
624                        // Code updates (if any) apply to the token implementation contract (at
625                        // impl_addr).
626
627                        // Handle proxy contract updates
628                        let impl_addr = match state_guard
629                            .proxy_token_addresses
630                            .get(&original_address)
631                        {
632                            Some(impl_addr) => {
633                                // Token already has a proxy contract.
634
635                                // Apply the storage update to proxy contract
636                                let proxy_update = AccountUpdate { code: None, ..update.clone() };
637                                account_update_by_address.insert(original_address, proxy_update);
638
639                                *impl_addr
640                            }
641                            None => {
642                                // Token does not have a proxy contract yet, create one
643
644                                // Assign original token (implementation) contract to new proxy
645                                // address
646                                let impl_addr = generate_proxy_token_address(
647                                    state_guard.proxy_token_addresses.len() as u32,
648                                )?;
649                                state_guard
650                                    .proxy_token_addresses
651                                    .insert(original_address, impl_addr);
652
653                                // Create proxy token account with original account's storage (at
654                                // original address)
655                                let proxy_state = create_proxy_token_account(
656                                    original_address,
657                                    Some(impl_addr),
658                                    &update.slots,
659                                    update.chain,
660                                    update.balance,
661                                );
662                                account_update_by_address.insert(original_address, proxy_state);
663
664                                impl_addr
665                            }
666                        };
667
668                        // Apply code update to token implementation contract
669                        if update.code.is_some() {
670                            let impl_update = AccountUpdate {
671                                address: impl_addr,
672                                slots: HashMap::new(),
673                                ..update.clone()
674                            };
675                            account_update_by_address.insert(impl_addr, impl_update);
676                        }
677                    } else {
678                        // Not a token, apply update to the account at its original address
679                        account_update_by_address.insert(update.address, update);
680                    }
681                }
682                drop(state_guard);
683
684                let state_guard = self.state.read().await;
685                info!("Updating engine with {} contract deltas", deltas.account_updates.len());
686                update_engine(
687                    SHARED_TYCHO_DB.clone(),
688                    header.clone().block(),
689                    None,
690                    account_update_by_address,
691                )
692                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
693                info!("Engine updated");
694
695                // Collect all pools related to the updated accounts
696                let mut pools_to_update = HashSet::new();
697                for (account, _update) in deltas.account_updates {
698                    // get new pools related to the account updated
699                    pools_to_update.extend(
700                        contracts_map
701                            .get(&account)
702                            .cloned()
703                            .unwrap_or_default(),
704                    );
705                    // get existing pools related to the account updated
706                    pools_to_update.extend(
707                        state_guard
708                            .contracts_map
709                            .get(&account)
710                            .cloned()
711                            .unwrap_or_default(),
712                    );
713                }
714
715                // Collect all balance changes this block
716                let all_balances = Balances {
717                    component_balances: deltas
718                        .component_balances
719                        .iter()
720                        .map(|(pool_id, bals)| {
721                            let mut balances = HashMap::new();
722                            for (t, b) in &bals.0 {
723                                balances.insert(t.clone(), b.balance.clone());
724                            }
725                            pools_to_update.insert(pool_id.clone());
726                            (pool_id.clone(), balances)
727                        })
728                        .collect(),
729                    account_balances: deltas
730                        .account_balances
731                        .iter()
732                        .map(|(account, bals)| {
733                            let mut balances = HashMap::new();
734                            for (t, b) in bals {
735                                balances.insert(t.clone(), b.balance.clone());
736                            }
737                            pools_to_update.extend(
738                                contracts_map
739                                    .get(account)
740                                    .cloned()
741                                    .unwrap_or_default(),
742                            );
743                            (account.clone(), balances)
744                        })
745                        .collect(),
746                };
747
748                // update states with protocol state deltas (attribute changes etc.)
749                for (id, update) in deltas.state_updates {
750                    // TODO: is this needed?
751                    let update_with_block =
752                        Self::add_block_info_to_delta(update, current_block.clone());
753                    match Self::apply_update(
754                        &id,
755                        update_with_block,
756                        &mut updated_states,
757                        &state_guard,
758                        &all_balances,
759                    ) {
760                        Ok(_) => {
761                            pools_to_update.remove(&id);
762                        }
763                        Err(e) => {
764                            if self.skip_state_decode_failures {
765                                warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
766                                // Remove from updated_states if it was there
767                                updated_states.remove(&id);
768                                // Try to get component from new_pairs first, then from state
769                                if let Some(component) = new_pairs.remove(&id) {
770                                    removed_pairs.insert(id.clone(), component);
771                                } else if let Some(component) = state_guard.components.get(&id) {
772                                    removed_pairs.insert(id.clone(), component.clone());
773                                } else {
774                                    // Component not found in new_pairs or state, this shouldn't
775                                    // happen
776                                    warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
777                                }
778                                pools_to_update.remove(&id);
779
780                                // Add to failed components
781                                msg_failed_components.insert(id.clone());
782                            } else {
783                                return Err(e);
784                            }
785                        }
786                    }
787                }
788
789                // update remaining pools linked to updated contracts/updated balances
790                for pool in pools_to_update {
791                    // TODO: is this needed?
792                    let default_delta_with_block = Self::add_block_info_to_delta(
793                        ProtocolStateDelta::default(),
794                        current_block.clone(),
795                    );
796                    match Self::apply_update(
797                        &pool,
798                        default_delta_with_block,
799                        &mut updated_states,
800                        &state_guard,
801                        &all_balances,
802                    ) {
803                        Ok(_) => {}
804                        Err(e) => {
805                            if self.skip_state_decode_failures {
806                                warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
807                                // Remove from updated_states if it was there
808                                updated_states.remove(&pool);
809                                // Try to get component from new_pairs first, then from state
810                                if let Some(component) = new_pairs.remove(&pool) {
811                                    removed_pairs.insert(pool.clone(), component);
812                                } else if let Some(component) = state_guard.components.get(&pool) {
813                                    removed_pairs.insert(pool.clone(), component.clone());
814                                } else {
815                                    // Component not found in new_pairs or state, this shouldn't
816                                    // happen
817                                    warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
818                                }
819
820                                // Add to failed components
821                                msg_failed_components.insert(pool.clone());
822                            } else {
823                                return Err(e);
824                            }
825                        }
826                    }
827                }
828            };
829        }
830
831        // Persist the newly added/updated states
832        let mut state_guard = self.state.write().await;
833
834        // Update failed components with any new ones
835        state_guard
836            .failed_components
837            .extend(msg_failed_components);
838
839        // Remove any failed components from Updates
840        // Perf: we could do it directly in the decoder logic to avoid some steps, but this logic is
841        // complex and this is more robust.
842        updated_states.retain(|id, _| {
843            !state_guard
844                .failed_components
845                .contains(id)
846        });
847        new_pairs.retain(|id, _| {
848            !state_guard
849                .failed_components
850                .contains(id)
851        });
852
853        state_guard
854            .states
855            .extend(updated_states.clone().into_iter());
856
857        // Add new components to persistent state
858        for (id, component) in new_pairs.iter() {
859            state_guard
860                .components
861                .insert(id.clone(), component.clone());
862        }
863
864        // Remove components from persistent state
865        for (id, _) in removed_pairs.iter() {
866            state_guard.components.remove(id);
867        }
868
869        for (key, values) in contracts_map {
870            state_guard
871                .contracts_map
872                .entry(key)
873                .or_insert_with(HashSet::new)
874                .extend(values);
875        }
876
877        // Send the tick with all updated states
878        Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
879            .set_removed_pairs(removed_pairs)
880            .set_sync_states(msg.sync_states.clone()))
881    }
882
883    /// Add block information (number and timestamp) to a ProtocolStateDelta
884    fn add_block_info_to_delta(
885        mut delta: ProtocolStateDelta,
886        block_header_opt: Option<BlockHeader>,
887    ) -> ProtocolStateDelta {
888        if let Some(header) = block_header_opt {
889            // Add block_number and block_timestamp attributes to ensure pool states
890            // receive current block information during delta_transition
891            delta.updated_attributes.insert(
892                "block_number".to_string(),
893                Bytes::from(header.number.to_be_bytes().to_vec()),
894            );
895            delta.updated_attributes.insert(
896                "block_timestamp".to_string(),
897                Bytes::from(header.timestamp.to_be_bytes().to_vec()),
898            );
899        }
900        delta
901    }
902
903    fn apply_update(
904        id: &String,
905        update: ProtocolStateDelta,
906        updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
907        state_guard: &RwLockReadGuard<'_, DecoderState>,
908        all_balances: &Balances,
909    ) -> Result<(), StreamDecodeError> {
910        match updated_states.entry(id.clone()) {
911            Entry::Occupied(mut entry) => {
912                // If state exists in updated_states, apply the delta to it
913                let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
914                state
915                    .delta_transition(update, &state_guard.tokens, all_balances)
916                    .map_err(|e| {
917                        error!(pool = id, error = ?e, "DeltaTransitionError");
918                        StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
919                    })?;
920            }
921            Entry::Vacant(_) => {
922                match state_guard.states.get(id) {
923                    // If state does not exist in updated_states, apply the delta to the stored
924                    // state
925                    Some(stored_state) => {
926                        let mut state = stored_state.clone();
927                        state
928                            .delta_transition(update, &state_guard.tokens, all_balances)
929                            .map_err(|e| {
930                                error!(pool = id, error = ?e, "DeltaTransitionError");
931                                StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
932                            })?;
933                        updated_states.insert(id.clone(), state);
934                    }
935                    None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
936                }
937            }
938        }
939        Ok(())
940    }
941}
942
943/// Generate a proxy token address for a given token index
944fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
945    let padded_idx = format!("{idx:x}");
946    let padded_zeroes = "0".repeat(33 - padded_idx.len());
947    let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
948    let decoded = hex::decode(proxy_token_address).map_err(|e| {
949        StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
950    })?;
951
952    const ADDRESS_LENGTH: usize = 20;
953    if decoded.len() != ADDRESS_LENGTH {
954        return Err(StreamDecodeError::Fatal(format!(
955            "Invalid proxy token address length: expected {}, got {}",
956            ADDRESS_LENGTH,
957            decoded.len(),
958        )));
959    }
960
961    Ok(Address::from_slice(&decoded))
962}
963
964/// Create a proxy token account for a token at a given address
965///
966/// The proxy token account is created at the original token address and points to the new token
967/// address.
968fn create_proxy_token_account(
969    addr: Address,
970    new_address: Option<Address>,
971    storage: &HashMap<U256, U256>,
972    chain: Chain,
973    balance: Option<U256>,
974) -> AccountUpdate {
975    let mut slots = storage.clone();
976    if let Some(new_address) = new_address {
977        slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
978    }
979
980    AccountUpdate {
981        address: addr,
982        chain,
983        slots,
984        balance,
985        code: Some(ERC20_PROXY_BYTECODE.to_vec()),
986        change: ChangeType::Creation,
987    }
988}
989
990#[cfg(test)]
991mock! {
992    #[derive(Debug)]
993    pub ProtocolSim {
994        pub fn fee(&self) -> f64;
995        pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
996        pub fn get_amount_out(
997            &self,
998            amount_in: BigUint,
999            token_in: &Token,
1000            token_out: &Token,
1001        ) -> Result<GetAmountOutResult, SimulationError>;
1002        pub fn get_limits(
1003            &self,
1004            sell_token: Bytes,
1005            buy_token: Bytes,
1006        ) -> Result<(BigUint, BigUint), SimulationError>;
1007        pub fn delta_transition(
1008            &mut self,
1009            delta: ProtocolStateDelta,
1010            tokens: &HashMap<Bytes, Token>,
1011            balances: &Balances,
1012        ) -> Result<(), TransitionError<String>>;
1013        pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1014        pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1015    }
1016}
1017
1018#[cfg(test)]
1019impl ProtocolSim for MockProtocolSim {
1020    fn fee(&self) -> f64 {
1021        self.fee()
1022    }
1023
1024    fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1025        self.spot_price(base, quote)
1026    }
1027
1028    fn get_amount_out(
1029        &self,
1030        amount_in: BigUint,
1031        token_in: &Token,
1032        token_out: &Token,
1033    ) -> Result<GetAmountOutResult, SimulationError> {
1034        self.get_amount_out(amount_in, token_in, token_out)
1035    }
1036
1037    fn get_limits(
1038        &self,
1039        sell_token: Bytes,
1040        buy_token: Bytes,
1041    ) -> Result<(BigUint, BigUint), SimulationError> {
1042        self.get_limits(sell_token, buy_token)
1043    }
1044
1045    fn delta_transition(
1046        &mut self,
1047        delta: ProtocolStateDelta,
1048        tokens: &HashMap<Bytes, Token>,
1049        balances: &Balances,
1050    ) -> Result<(), TransitionError<String>> {
1051        self.delta_transition(delta, tokens, balances)
1052    }
1053
1054    fn clone_box(&self) -> Box<dyn ProtocolSim> {
1055        self.clone_box()
1056    }
1057
1058    fn as_any(&self) -> &dyn Any {
1059        panic!("MockProtocolSim does not support as_any")
1060    }
1061
1062    fn as_any_mut(&mut self) -> &mut dyn Any {
1063        panic!("MockProtocolSim does not support as_any_mut")
1064    }
1065
1066    fn eq(&self, other: &dyn ProtocolSim) -> bool {
1067        self.eq(other)
1068    }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use std::{fs, path::Path, str::FromStr};
1074
1075    use alloy::primitives::address;
1076    use mockall::predicate::*;
1077    use rstest::*;
1078    use tycho_client::feed::BlockHeader;
1079    use tycho_common::{models::Chain, Bytes};
1080
1081    use super::*;
1082    use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1083
1084    async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1085        let mut decoder = TychoStreamDecoder::new();
1086        decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1087        if set_tokens {
1088            let tokens = [
1089                Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1090                Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1091            ]
1092            .iter()
1093            .map(|addr| {
1094                let addr_str = format!("{addr:x}");
1095                (
1096                    addr.clone(),
1097                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1098                )
1099            })
1100            .collect();
1101            decoder.set_tokens(tokens).await;
1102        }
1103        decoder
1104    }
1105
1106    fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1107        let project_root = env!("CARGO_MANIFEST_DIR");
1108        let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1109        let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1110        serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1111    }
1112
1113    #[tokio::test]
1114    async fn test_decode() {
1115        let decoder = setup_decoder(true).await;
1116
1117        let msg = load_test_msg("uniswap_v2_snapshot");
1118        let res1 = decoder
1119            .decode(&msg)
1120            .await
1121            .expect("decode failure");
1122        let msg = load_test_msg("uniswap_v2_delta");
1123        let res2 = decoder
1124            .decode(&msg)
1125            .await
1126            .expect("decode failure");
1127
1128        assert_eq!(res1.states.len(), 1);
1129        assert_eq!(res2.states.len(), 1);
1130        assert_eq!(res1.sync_states.len(), 1);
1131        assert_eq!(res2.sync_states.len(), 1);
1132    }
1133
1134    #[tokio::test]
1135    async fn test_decode_component_missing_token() {
1136        let decoder = setup_decoder(false).await;
1137        let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1138            .iter()
1139            .map(|addr| {
1140                let addr_str = format!("{addr:x}");
1141                (
1142                    addr.clone(),
1143                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1144                )
1145            })
1146            .collect();
1147        decoder.set_tokens(tokens).await;
1148
1149        let msg = load_test_msg("uniswap_v2_snapshot");
1150        let res1 = decoder
1151            .decode(&msg)
1152            .await
1153            .expect("decode failure");
1154
1155        assert_eq!(res1.states.len(), 0);
1156    }
1157
1158    #[tokio::test]
1159    async fn test_decode_component_bad_id() {
1160        let decoder = setup_decoder(true).await;
1161        let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1162
1163        match decoder.decode(&msg).await {
1164            Err(StreamDecodeError::Fatal(msg)) => {
1165                assert_eq!(msg, "Component id mismatch");
1166            }
1167            Ok(_) => {
1168                panic!("Expected failures to be raised")
1169            }
1170        }
1171    }
1172
1173    #[rstest]
1174    #[case(true)]
1175    #[case(false)]
1176    #[tokio::test]
1177    async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1178        let mut decoder = setup_decoder(true).await;
1179        decoder.skip_state_decode_failures = skip_failures;
1180
1181        let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1182        match decoder.decode(&msg).await {
1183            Err(StreamDecodeError::Fatal(msg)) => {
1184                if !skip_failures {
1185                    assert_eq!(msg, "Missing attributes reserve0");
1186                } else {
1187                    panic!("Expected failures to be ignored. Err: {msg}")
1188                }
1189            }
1190            Ok(res) => {
1191                if !skip_failures {
1192                    panic!("Expected failures to be raised")
1193                } else {
1194                    assert_eq!(res.states.len(), 0);
1195                }
1196            }
1197        }
1198    }
1199
1200    #[tokio::test]
1201    async fn test_decode_updates_state_on_contract_change() {
1202        let decoder = setup_decoder(true).await;
1203
1204        // Create the mock instances
1205        let mut mock_state = MockProtocolSim::new();
1206
1207        mock_state
1208            .expect_clone_box()
1209            .times(1)
1210            .returning(|| {
1211                let mut cloned_mock_state = MockProtocolSim::new();
1212                // Expect `delta_transition` to be called once with any parameters
1213                cloned_mock_state
1214                    .expect_delta_transition()
1215                    .times(1)
1216                    .returning(|_, _, _| Ok(()));
1217                cloned_mock_state
1218                    .expect_clone_box()
1219                    .times(1)
1220                    .returning(|| Box::new(MockProtocolSim::new()));
1221                Box::new(cloned_mock_state)
1222            });
1223
1224        // Insert mock state into `updated_states`
1225        let pool_id =
1226            "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1227        decoder
1228            .state
1229            .write()
1230            .await
1231            .states
1232            .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1233        decoder
1234            .state
1235            .write()
1236            .await
1237            .contracts_map
1238            .insert(
1239                Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1240                HashSet::from([pool_id.clone()]),
1241            );
1242
1243        // Load a test message containing a contract update
1244        let msg = load_test_msg("balancer_v2_delta");
1245
1246        // Decode the message
1247        let _ = decoder
1248            .decode(&msg)
1249            .await
1250            .expect("decode failure");
1251
1252        // The mock framework will assert that `delta_transition` was called exactly once
1253    }
1254
1255    #[test]
1256    fn test_generate_proxy_token_address() {
1257        let idx = 1;
1258        let generated_address =
1259            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1260        assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1261
1262        let idx = 123456;
1263        let generated_address =
1264            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1265        assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1266    }
1267
1268    #[tokio::test(flavor = "multi_thread")]
1269    async fn test_euler_hook_low_pool_manager_balance() {
1270        let mut decoder = TychoStreamDecoder::new();
1271
1272        decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1273            "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1274        );
1275
1276        let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1277        let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1278        let tokens = HashMap::from([
1279            (
1280                weth.clone(),
1281                Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1282            ),
1283            (
1284                teth.clone(),
1285                Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1286            ),
1287        ]);
1288
1289        decoder.set_tokens(tokens.clone()).await;
1290
1291        let msg = load_test_msg("euler_hook_snapshot");
1292        let res = decoder
1293            .decode(&msg)
1294            .await
1295            .expect("decode failure");
1296
1297        let pool_state = res
1298            .states
1299            .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1300            .expect("Couldn't find target pool");
1301        let amount_out = pool_state
1302            .get_amount_out(
1303                BigUint::from_str("1000000000000000000").unwrap(),
1304                tokens.get(&teth).unwrap(),
1305                tokens.get(&weth).unwrap(),
1306            )
1307            .expect("Get amount out failed");
1308
1309        assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1310    }
1311}