Skip to main content

tycho_simulation/evm/
decoder.rs

1use std::{
2    collections::{hash_map::Entry, HashMap, HashSet},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14    dto::{ChangeType, ProtocolStateDelta},
15    models::{token::Token, Chain},
16    simulation::protocol_sim::{Balances, ProtocolSim},
17    Bytes,
18};
19#[cfg(test)]
20use {
21    mockall::mock,
22    num_bigint::BigUint,
23    std::any::Any,
24    tycho_common::simulation::{
25        errors::{SimulationError, TransitionError},
26        protocol_sim::GetAmountOutResult,
27    },
28};
29
30use crate::{
31    evm::{
32        engine_db::{update_engine, SHARED_TYCHO_DB},
33        protocol::{
34            utils::bytes_to_address,
35            vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36        },
37        tycho_models::{AccountUpdate, ResponseAccount},
38    },
39    protocol::{
40        errors::InvalidSnapshotError,
41        models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42    },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47    #[error("{0}")]
48    Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53    tokens: HashMap<Bytes, Token>,
54    states: HashMap<String, Box<dyn ProtocolSim>>,
55    components: HashMap<String, ProtocolComponent>,
56    // maps contract address to the pools they affect
57    contracts_map: HashMap<Bytes, HashSet<String>>,
58    // Maps original token address to their new proxy token address
59    proxy_token_addresses: HashMap<Address, Address>,
60    // Set of failed components, these are components that failed to decode and will not be emitted
61    // again TODO: handle more gracefully inside tycho-client. We could fetch the snapshot and
62    // try to decode it again.
63    failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67    Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70    + Send
71    + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74/// A decoder to process raw messages.
75///
76/// This struct decodes incoming messages of type `FeedMessage` and converts it into the
77/// `BlockUpdate` struct.
78///
79/// # Important:
80/// - Supports registering exchanges and their associated filters for specific protocol components.
81/// - Allows the addition of client-side filters for custom conditions.
82///
83/// **Note:** The tokens provided during configuration will be used for decoding, ensuring
84/// efficient handling of protocol components. Protocol components containing tokens which are not
85/// included in this initial list, or added when applying deltas, will not be decoded.
86pub struct TychoStreamDecoder<H>
87where
88    H: HeaderLike,
89{
90    state: Arc<RwLock<DecoderState>>,
91    skip_state_decode_failures: bool,
92    min_token_quality: u32,
93    registry: HashMap<String, Box<RegistryFn<H>>>,
94    inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110    pub fn new() -> Self {
111        Self {
112            state: Arc::new(RwLock::new(DecoderState::default())),
113            skip_state_decode_failures: false,
114            min_token_quality: 51,
115            registry: HashMap::new(),
116            inclusion_filters: HashMap::new(),
117        }
118    }
119
120    /// Sets the currently known tokens which will be considered during decoding.
121    ///
122    /// Protocol components containing tokens which are not included in this initial list, or
123    /// added when applying deltas, will not be decoded.
124    pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125        let mut guard = self.state.write().await;
126        guard.tokens = tokens;
127    }
128
129    pub fn skip_state_decode_failures(&mut self, skip: bool) {
130        self.skip_state_decode_failures = skip;
131    }
132
133    /// Registers a decoder for a given exchange with a decoder context.
134    ///
135    /// This method maps an exchange identifier to a specific protocol simulation type.
136    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
137    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
138    /// the component data into the appropriate protocol simulation type based on the current
139    /// blockchain state and the provided block header.
140    /// For example, to register a decoder for the `uniswap_v2` exchange with an additional decoder
141    /// context, you must call this function with
142    /// `register_decoder_with_context::<UniswapV2State>("uniswap_v2", context)`.
143    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
144    /// `UniswapV2State` decoder for use in the protocol stream.
145    pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
146    where
147        T: ProtocolSim
148            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
149            + Send
150            + 'static,
151    {
152        let decoder = Box::new(
153            move |component: ComponentWithState,
154                  header: H,
155                  account_balances: AccountBalances,
156                  state: Arc<RwLock<DecoderState>>| {
157                let context = context.clone();
158                Box::pin(async move {
159                    let guard = state.read().await;
160                    T::try_from_with_header(
161                        component,
162                        header,
163                        &account_balances,
164                        &guard.tokens,
165                        &context,
166                    )
167                    .await
168                    .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
169                }) as DecodeFut
170            },
171        );
172        self.registry
173            .insert(exchange.to_string(), decoder);
174    }
175
176    /// Registers a decoder for a given exchange.
177    ///
178    /// This method maps an exchange identifier to a specific protocol simulation type.
179    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
180    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
181    /// the component data into the appropriate protocol simulation type based on the current
182    /// blockchain state and the provided block header.
183    /// For example, to register a decoder for the `uniswap_v2` exchange, you must call
184    /// this function with `register_decoder::<UniswapV2State>("uniswap_v2", vm_attributes)`.
185    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
186    /// `UniswapV2State` decoder for use in the protocol stream.
187    pub fn register_decoder<T>(&mut self, exchange: &str)
188    where
189        T: ProtocolSim
190            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
191            + Send
192            + 'static,
193    {
194        let context = DecoderContext::new();
195        self.register_decoder_with_context::<T>(exchange, context);
196    }
197
198    /// Registers a client-side filter function for a given exchange.
199    ///
200    /// Associates a filter function with an exchange ID, enabling custom filtering of protocol
201    /// components. The filter function is applied client-side to refine the data received from the
202    /// stream. It can be used to exclude certain components based on attributes or conditions that
203    /// are not supported by the server-side filtering logic. This is particularly useful for
204    /// implementing custom behaviors, such as:
205    /// - Filtering out pools with specific attributes (e.g., unsupported features).
206    /// - Blacklisting pools based on custom criteria.
207    /// - Excluding pools that do not meet certain requirements (e.g., token pairs or liquidity
208    ///   constraints).
209    ///
210    /// For example, you might use a filter to exclude pools that are not fully supported in the
211    /// protocol, or to ignore pools with certain attributes that are irrelevant to your
212    /// application.
213    pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
214        self.inclusion_filters
215            .insert(exchange.to_string(), predicate);
216    }
217
218    /// Decodes a `FeedMessage` into a `BlockUpdate` containing the updated states of protocol
219    /// components
220    pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
221        // stores all states updated in this tick/msg
222        let mut updated_states = HashMap::new();
223        let mut new_pairs = HashMap::new();
224        let mut removed_pairs = HashMap::new();
225        let mut contracts_map = HashMap::new();
226        let mut msg_failed_components = HashSet::new();
227
228        let header = msg
229            .state_msgs
230            .values()
231            .next()
232            .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
233            .header
234            .clone();
235
236        let block_number_or_timestamp = header
237            .clone()
238            .block_number_or_timestamp();
239        let current_block = header.clone().block();
240
241        for (protocol, protocol_msg) in msg.state_msgs.iter() {
242            // Add any new tokens
243            if let Some(deltas) = protocol_msg.deltas.as_ref() {
244                let mut state_guard = self.state.write().await;
245
246                let new_tokens = deltas
247                    .new_tokens
248                    .iter()
249                    .filter(|(addr, t)| {
250                        t.quality >= self.min_token_quality &&
251                            !state_guard.tokens.contains_key(*addr)
252                    })
253                    .filter_map(|(addr, t)| {
254                        t.clone()
255                            .try_into()
256                            .map(|token| (addr.clone(), token))
257                            .inspect_err(|e| {
258                                warn!("Failed decoding token {e:?} {addr:#044x}");
259                                *e
260                            })
261                            .ok()
262                    })
263                    .collect::<HashMap<Bytes, Token>>();
264
265                if !new_tokens.is_empty() {
266                    debug!(n = new_tokens.len(), "NewTokens");
267                    state_guard.tokens.extend(new_tokens);
268                }
269            }
270
271            // Remove untracked components
272            {
273                let mut state_guard = self.state.write().await;
274                let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
275                    .removed_components
276                    .iter()
277                    .map(|(id, comp)| {
278                        if *id != comp.id {
279                            error!(
280                                "Component id mismatch in removed components {id} != {}",
281                                comp.id
282                            );
283                            return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
284                        }
285
286                        let tokens = comp
287                            .tokens
288                            .iter()
289                            .flat_map(|addr| state_guard.tokens.get(addr).cloned())
290                            .collect::<Vec<_>>();
291
292                        if tokens.len() == comp.tokens.len() {
293                            Ok(Some((
294                                id.clone(),
295                                ProtocolComponent::from_with_tokens(comp.clone(), tokens),
296                            )))
297                        } else {
298                            Ok(None)
299                        }
300                    })
301                    .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
302                    )?
303                    .into_iter()
304                    .flatten()
305                    .collect();
306
307                // Remove components from state and add to removed_pairs
308                for (id, component) in removed_components {
309                    state_guard.components.remove(&id);
310                    state_guard.states.remove(&id);
311                    removed_pairs.insert(id, component);
312                }
313
314                // UPDATE VM STORAGE
315                info!(
316                    "Processing {} contracts from snapshots",
317                    protocol_msg
318                        .snapshots
319                        .get_vm_storage()
320                        .len()
321                );
322
323                let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
324                let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
325                for (key, value) in protocol_msg
326                    .snapshots
327                    .get_vm_storage()
328                    .iter()
329                {
330                    let account: ResponseAccount = value.clone().into();
331
332                    if state_guard.tokens.contains_key(key) {
333                        let original_address = account.address;
334                        // To work with Tycho's token overwrites system, if we get account
335                        // snapshots for a token we must handle them with a proxy/wrapper
336                        // contract.
337                        // Note: storage for the original contract must be set at the proxy
338                        // contract address. This is because the proxy contract uses
339                        // delegatecall to the original (implementation) contract.
340
341                        // Handle proxy token accounts
342                        let (impl_addr, proxy_state) = match state_guard
343                            .proxy_token_addresses
344                            .get(&original_address)
345                        {
346                            Some(impl_addr) => {
347                                // Token already has a proxy contract, simply update it.
348
349                                // Note: we apply the snapshot as an update. This is to cover the
350                                // case where a contract may be stale as it stopped being tracked
351                                // for some reason (e.g. due to a drop in tvl) and is now being
352                                // tracked again.
353                                let proxy_state = AccountUpdate::new(
354                                    original_address,
355                                    value.chain.into(),
356                                    account.slots.clone(),
357                                    Some(account.native_balance),
358                                    None,
359                                    ChangeType::Update,
360                                );
361                                (*impl_addr, proxy_state)
362                            }
363                            None => {
364                                // Token does not have a proxy contract yet, create one
365
366                                // Assign original token contract to new address
367                                let impl_addr = generate_proxy_token_address(
368                                    state_guard.proxy_token_addresses.len() as u32,
369                                )?;
370                                state_guard
371                                    .proxy_token_addresses
372                                    .insert(original_address, impl_addr);
373
374                                // Add proxy token contract at original token address
375                                let proxy_state = create_proxy_token_account(
376                                    original_address,
377                                    Some(impl_addr),
378                                    &account.slots,
379                                    value.chain.into(),
380                                    Some(account.native_balance),
381                                );
382
383                                (impl_addr, proxy_state)
384                            }
385                        };
386
387                        proxy_token_accounts.insert(original_address, proxy_state);
388
389                        // Assign original token contract to the implementation address
390                        let impl_update = ResponseAccount {
391                            address: impl_addr,
392                            slots: HashMap::new(),
393                            ..account.clone()
394                        };
395                        storage_by_address.insert(impl_addr, impl_update);
396                    } else {
397                        // Not a token, apply snapshot to the account at its original address
398                        storage_by_address.insert(account.address, account);
399                    }
400                }
401
402                info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
403                update_engine(
404                    SHARED_TYCHO_DB.clone(),
405                    header.clone().block(),
406                    Some(storage_by_address),
407                    proxy_token_accounts,
408                )
409                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
410                info!("Engine updated");
411                drop(state_guard);
412            }
413
414            // Construct a contract to token balances map: HashMap<ContractAddress,
415            // HashMap<TokenAddress, Balance>>
416            let account_balances = protocol_msg
417                .clone()
418                .snapshots
419                .get_vm_storage()
420                .iter()
421                .filter_map(|(addr, acc)| {
422                    let balances = acc.token_balances.clone();
423                    if balances.is_empty() {
424                        return None;
425                    }
426                    Some((addr.clone(), balances))
427                })
428                .collect::<AccountBalances>();
429
430            let mut new_components = HashMap::new();
431            let mut count_token_skips = 0;
432            let mut components_to_store = HashMap::new();
433            {
434                let state_guard = self.state.read().await;
435
436                // PROCESS SNAPSHOTS
437                'snapshot_loop: for (id, snapshot) in protocol_msg
438                    .snapshots
439                    .get_states()
440                    .clone()
441                {
442                    // Skip any unsupported pools
443                    if self
444                        .inclusion_filters
445                        .get(protocol.as_str())
446                        .is_some_and(|predicate| !predicate(&snapshot))
447                    {
448                        continue;
449                    }
450
451                    // Construct component from snapshot
452                    let mut component_tokens = Vec::new();
453                    let mut new_tokens_accounts = HashMap::new();
454                    for token in snapshot.component.tokens.clone() {
455                        match state_guard.tokens.get(&token) {
456                            Some(token) => {
457                                component_tokens.push(token.clone());
458
459                                // If the token is not an existing proxy token, we need to add it to
460                                // the simulation engine
461                                let token_address = match bytes_to_address(&token.address) {
462                                    Ok(addr) => addr,
463                                    Err(_) => {
464                                        count_token_skips += 1;
465                                        msg_failed_components.insert(id.clone());
466                                        warn!(
467                                            "Token address could not be decoded {}, ignoring pool {:x?}",
468                                            token.address, id
469                                        );
470                                        continue 'snapshot_loop;
471                                    }
472                                };
473                                // Deploy a proxy account without an implementation set
474                                if !state_guard
475                                    .proxy_token_addresses
476                                    .contains_key(&token_address)
477                                {
478                                    new_tokens_accounts.insert(
479                                        token_address,
480                                        create_proxy_token_account(
481                                            token_address,
482                                            None,
483                                            &HashMap::new(),
484                                            snapshot.component.chain.into(),
485                                            None,
486                                        ),
487                                    );
488                                }
489                            }
490                            None => {
491                                count_token_skips += 1;
492                                msg_failed_components.insert(id.clone());
493                                debug!("Token not found {}, ignoring pool {:x?}", token, id);
494                                continue 'snapshot_loop;
495                            }
496                        }
497                    }
498                    let component = ProtocolComponent::from_with_tokens(
499                        snapshot.component.clone(),
500                        component_tokens,
501                    );
502
503                    // Add new tokens to the simulation engine
504                    if !new_tokens_accounts.is_empty() {
505                        update_engine(
506                            SHARED_TYCHO_DB.clone(),
507                            header.clone().block(),
508                            None,
509                            new_tokens_accounts,
510                        )
511                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
512                    }
513
514                    // collect contracts:ids mapping for states that should update on contract
515                    // changes (non-manual updates)
516                    if !component
517                        .static_attributes
518                        .contains_key("manual_updates")
519                    {
520                        for contract in &component.contract_ids {
521                            contracts_map
522                                .entry(contract.clone())
523                                .or_insert_with(HashSet::new)
524                                .insert(id.clone());
525                        }
526                        // Add DCI contracts so changes to these contracts trigger
527                        // an update
528                        for (_, tracing) in snapshot.entrypoints.iter() {
529                            for contract in tracing.accessed_slots.keys().cloned() {
530                                contracts_map
531                                    .entry(contract)
532                                    .or_insert_with(HashSet::new)
533                                    .insert(id.clone());
534                            }
535                        }
536                    }
537
538                    // Collect new pairs (components)
539                    new_pairs.insert(id.clone(), component.clone());
540
541                    // Store component for later batch insertion
542                    components_to_store.insert(id.clone(), component);
543
544                    // Construct state from snapshot
545                    if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
546                        match state_decode_f(
547                            snapshot,
548                            header.clone(),
549                            account_balances.clone(),
550                            self.state.clone(),
551                        )
552                        .await
553                        {
554                            Ok(state) => {
555                                new_components.insert(id.clone(), state);
556                            }
557                            Err(e) => {
558                                if self.skip_state_decode_failures {
559                                    warn!(pool = id, error = %e, "StateDecodingFailure");
560                                    msg_failed_components.insert(id.clone());
561                                    continue 'snapshot_loop;
562                                } else {
563                                    error!(pool = id, error = %e, "StateDecodingFailure");
564                                    return Err(StreamDecodeError::Fatal(format!("{e}")));
565                                }
566                            }
567                        }
568                    } else if self.skip_state_decode_failures {
569                        warn!(pool = id, "MissingDecoderRegistration");
570                        msg_failed_components.insert(id.clone());
571                        continue 'snapshot_loop;
572                    } else {
573                        error!(pool = id, "MissingDecoderRegistration");
574                        return Err(StreamDecodeError::Fatal(format!(
575                            "Missing decoder registration for: {id}"
576                        )));
577                    }
578                }
579            }
580
581            // Batch insert components into state
582            if !components_to_store.is_empty() {
583                let mut state_guard = self.state.write().await;
584                for (id, component) in components_to_store {
585                    state_guard
586                        .components
587                        .insert(id, component);
588                }
589            }
590
591            if !protocol_msg.snapshots.states.is_empty() {
592                info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
593            }
594            if count_token_skips > 0 {
595                info!("Skipped {count_token_skips} pools due to missing tokens");
596            }
597
598            //TODO: should we remove failed components for new_components?
599            updated_states.extend(new_components);
600
601            // PROCESS DELTAS
602            if let Some(deltas) = protocol_msg.deltas.clone() {
603                // Update engine with account changes
604                let mut state_guard = self.state.write().await;
605
606                let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
607                for (key, value) in deltas.account_updates.iter() {
608                    let mut update: AccountUpdate = value.clone().into();
609
610                    // TEMP PATCH (ENG-4993)
611                    //
612                    // The indexer emits deltas without code marked as creations, which crashes
613                    // TychoDB. Until fixed, treat them as updates (since EVM code cannot be
614                    // deleted).
615                    if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
616                        error!(
617                            update = ?update,
618                            "FaultyCreationDelta"
619                        );
620                        update.change = ChangeType::Update;
621                    }
622
623                    if state_guard.tokens.contains_key(key) {
624                        let original_address = update.address;
625                        // If the account is a token, we need to handle it with a proxy contract.
626                        // Storage updates apply to the proxy contract (at original address).
627                        // Code updates (if any) apply to the token implementation contract (at
628                        // impl_addr).
629
630                        // Handle proxy contract updates
631                        let impl_addr = match state_guard
632                            .proxy_token_addresses
633                            .get(&original_address)
634                        {
635                            Some(impl_addr) => {
636                                // Token already has a proxy contract.
637
638                                // Apply the storage update to proxy contract
639                                let proxy_update = AccountUpdate { code: None, ..update.clone() };
640                                account_update_by_address.insert(original_address, proxy_update);
641
642                                *impl_addr
643                            }
644                            None => {
645                                // Token does not have a proxy contract yet, create one
646
647                                // Assign original token (implementation) contract to new proxy
648                                // address
649                                let impl_addr = generate_proxy_token_address(
650                                    state_guard.proxy_token_addresses.len() as u32,
651                                )?;
652                                state_guard
653                                    .proxy_token_addresses
654                                    .insert(original_address, impl_addr);
655
656                                // Create proxy token account with original account's storage (at
657                                // original address)
658                                let proxy_state = create_proxy_token_account(
659                                    original_address,
660                                    Some(impl_addr),
661                                    &update.slots,
662                                    update.chain,
663                                    update.balance,
664                                );
665                                account_update_by_address.insert(original_address, proxy_state);
666
667                                impl_addr
668                            }
669                        };
670
671                        // Apply code update to token implementation contract
672                        if update.code.is_some() {
673                            let impl_update = AccountUpdate {
674                                address: impl_addr,
675                                slots: HashMap::new(),
676                                ..update.clone()
677                            };
678                            account_update_by_address.insert(impl_addr, impl_update);
679                        }
680                    } else {
681                        // Not a token, apply update to the account at its original address
682                        account_update_by_address.insert(update.address, update);
683                    }
684                }
685                drop(state_guard);
686
687                let state_guard = self.state.read().await;
688                info!("Updating engine with {} contract deltas", deltas.account_updates.len());
689                update_engine(
690                    SHARED_TYCHO_DB.clone(),
691                    header.clone().block(),
692                    None,
693                    account_update_by_address,
694                )
695                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
696                info!("Engine updated");
697
698                // Collect all pools related to the updated accounts
699                let mut pools_to_update = HashSet::new();
700                for (account, _update) in deltas.account_updates {
701                    // get new pools related to the account updated
702                    pools_to_update.extend(
703                        contracts_map
704                            .get(&account)
705                            .cloned()
706                            .unwrap_or_default(),
707                    );
708                    // get existing pools related to the account updated
709                    pools_to_update.extend(
710                        state_guard
711                            .contracts_map
712                            .get(&account)
713                            .cloned()
714                            .unwrap_or_default(),
715                    );
716                }
717
718                // Collect all balance changes this block
719                let all_balances = Balances {
720                    component_balances: deltas
721                        .component_balances
722                        .iter()
723                        .map(|(pool_id, bals)| {
724                            let mut balances = HashMap::new();
725                            for (t, b) in &bals.0 {
726                                balances.insert(t.clone(), b.balance.clone());
727                            }
728                            pools_to_update.insert(pool_id.clone());
729                            (pool_id.clone(), balances)
730                        })
731                        .collect(),
732                    account_balances: deltas
733                        .account_balances
734                        .iter()
735                        .map(|(account, bals)| {
736                            let mut balances = HashMap::new();
737                            for (t, b) in bals {
738                                balances.insert(t.clone(), b.balance.clone());
739                            }
740                            pools_to_update.extend(
741                                contracts_map
742                                    .get(account)
743                                    .cloned()
744                                    .unwrap_or_default(),
745                            );
746                            (account.clone(), balances)
747                        })
748                        .collect(),
749                };
750
751                // update states with protocol state deltas (attribute changes etc.)
752                for (id, update) in deltas.state_updates {
753                    // TODO: is this needed?
754                    let update_with_block =
755                        Self::add_block_info_to_delta(update, current_block.clone());
756                    match Self::apply_update(
757                        &id,
758                        update_with_block,
759                        &mut updated_states,
760                        &state_guard,
761                        &all_balances,
762                    ) {
763                        Ok(_) => {
764                            pools_to_update.remove(&id);
765                        }
766                        Err(e) => {
767                            if self.skip_state_decode_failures {
768                                warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
769                                // Remove from updated_states if it was there
770                                updated_states.remove(&id);
771                                // Try to get component from new_pairs first, then from state
772                                if let Some(component) = new_pairs.remove(&id) {
773                                    removed_pairs.insert(id.clone(), component);
774                                } else if let Some(component) = state_guard.components.get(&id) {
775                                    removed_pairs.insert(id.clone(), component.clone());
776                                } else {
777                                    // Component not found in new_pairs or state, this shouldn't
778                                    // happen
779                                    warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
780                                }
781                                pools_to_update.remove(&id);
782
783                                // Add to failed components
784                                msg_failed_components.insert(id.clone());
785                            } else {
786                                return Err(e);
787                            }
788                        }
789                    }
790                }
791
792                // update remaining pools linked to updated contracts/updated balances
793                for pool in pools_to_update {
794                    // TODO: is this needed?
795                    let default_delta_with_block = Self::add_block_info_to_delta(
796                        ProtocolStateDelta::default(),
797                        current_block.clone(),
798                    );
799                    match Self::apply_update(
800                        &pool,
801                        default_delta_with_block,
802                        &mut updated_states,
803                        &state_guard,
804                        &all_balances,
805                    ) {
806                        Ok(_) => {}
807                        Err(e) => {
808                            if self.skip_state_decode_failures {
809                                warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
810                                // Remove from updated_states if it was there
811                                updated_states.remove(&pool);
812                                // Try to get component from new_pairs first, then from state
813                                if let Some(component) = new_pairs.remove(&pool) {
814                                    removed_pairs.insert(pool.clone(), component);
815                                } else if let Some(component) = state_guard.components.get(&pool) {
816                                    removed_pairs.insert(pool.clone(), component.clone());
817                                } else {
818                                    // Component not found in new_pairs or state, this shouldn't
819                                    // happen
820                                    warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
821                                }
822
823                                // Add to failed components
824                                msg_failed_components.insert(pool.clone());
825                            } else {
826                                return Err(e);
827                            }
828                        }
829                    }
830                }
831            };
832        }
833
834        // Persist the newly added/updated states
835        let mut state_guard = self.state.write().await;
836
837        // Update failed components with any new ones
838        state_guard
839            .failed_components
840            .extend(msg_failed_components);
841
842        // Remove any failed components from Updates
843        // Perf: we could do it directly in the decoder logic to avoid some steps, but this logic is
844        // complex and this is more robust.
845        updated_states.retain(|id, _| {
846            !state_guard
847                .failed_components
848                .contains(id)
849        });
850        new_pairs.retain(|id, _| {
851            !state_guard
852                .failed_components
853                .contains(id)
854        });
855
856        state_guard
857            .states
858            .extend(updated_states.clone().into_iter());
859
860        // Add new components to persistent state
861        for (id, component) in new_pairs.iter() {
862            state_guard
863                .components
864                .insert(id.clone(), component.clone());
865        }
866
867        // Remove components from persistent state
868        for (id, _) in removed_pairs.iter() {
869            state_guard.components.remove(id);
870        }
871
872        for (key, values) in contracts_map {
873            state_guard
874                .contracts_map
875                .entry(key)
876                .or_insert_with(HashSet::new)
877                .extend(values);
878        }
879
880        // Send the tick with all updated states
881        Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
882            .set_removed_pairs(removed_pairs)
883            .set_sync_states(msg.sync_states.clone()))
884    }
885
886    /// Add block information (number and timestamp) to a ProtocolStateDelta
887    fn add_block_info_to_delta(
888        mut delta: ProtocolStateDelta,
889        block_header_opt: Option<BlockHeader>,
890    ) -> ProtocolStateDelta {
891        if let Some(header) = block_header_opt {
892            // Add block_number and block_timestamp attributes to ensure pool states
893            // receive current block information during delta_transition
894            delta.updated_attributes.insert(
895                "block_number".to_string(),
896                Bytes::from(header.number.to_be_bytes().to_vec()),
897            );
898            delta.updated_attributes.insert(
899                "block_timestamp".to_string(),
900                Bytes::from(header.timestamp.to_be_bytes().to_vec()),
901            );
902        }
903        delta
904    }
905
906    fn apply_update(
907        id: &String,
908        update: ProtocolStateDelta,
909        updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
910        state_guard: &RwLockReadGuard<'_, DecoderState>,
911        all_balances: &Balances,
912    ) -> Result<(), StreamDecodeError> {
913        match updated_states.entry(id.clone()) {
914            Entry::Occupied(mut entry) => {
915                // If state exists in updated_states, apply the delta to it
916                let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
917                state
918                    .delta_transition(update, &state_guard.tokens, all_balances)
919                    .map_err(|e| {
920                        error!(pool = id, error = ?e, "DeltaTransitionError");
921                        StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
922                    })?;
923            }
924            Entry::Vacant(_) => {
925                match state_guard.states.get(id) {
926                    // If state does not exist in updated_states, apply the delta to the stored
927                    // state
928                    Some(stored_state) => {
929                        let mut state = stored_state.clone();
930                        state
931                            .delta_transition(update, &state_guard.tokens, all_balances)
932                            .map_err(|e| {
933                                error!(pool = id, error = ?e, "DeltaTransitionError");
934                                StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
935                            })?;
936                        updated_states.insert(id.clone(), state);
937                    }
938                    None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
939                }
940            }
941        }
942        Ok(())
943    }
944}
945
946/// Generate a proxy token address for a given token index
947fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
948    let padded_idx = format!("{idx:x}");
949    let padded_zeroes = "0".repeat(33 - padded_idx.len());
950    let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
951    let decoded = hex::decode(proxy_token_address).map_err(|e| {
952        StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
953    })?;
954
955    const ADDRESS_LENGTH: usize = 20;
956    if decoded.len() != ADDRESS_LENGTH {
957        return Err(StreamDecodeError::Fatal(format!(
958            "Invalid proxy token address length: expected {}, got {}",
959            ADDRESS_LENGTH,
960            decoded.len(),
961        )));
962    }
963
964    Ok(Address::from_slice(&decoded))
965}
966
967/// Create a proxy token account for a token at a given address
968///
969/// The proxy token account is created at the original token address and points to the new token
970/// address.
971fn create_proxy_token_account(
972    addr: Address,
973    new_address: Option<Address>,
974    storage: &HashMap<U256, U256>,
975    chain: Chain,
976    balance: Option<U256>,
977) -> AccountUpdate {
978    let mut slots = storage.clone();
979    if let Some(new_address) = new_address {
980        slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
981    }
982
983    AccountUpdate {
984        address: addr,
985        chain,
986        slots,
987        balance,
988        code: Some(ERC20_PROXY_BYTECODE.to_vec()),
989        change: ChangeType::Creation,
990    }
991}
992
993#[cfg(test)]
994mock! {
995    #[derive(Debug)]
996    pub ProtocolSim {
997        pub fn fee(&self) -> f64;
998        pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
999        pub fn get_amount_out(
1000            &self,
1001            amount_in: BigUint,
1002            token_in: &Token,
1003            token_out: &Token,
1004        ) -> Result<GetAmountOutResult, SimulationError>;
1005        pub fn get_limits(
1006            &self,
1007            sell_token: Bytes,
1008            buy_token: Bytes,
1009        ) -> Result<(BigUint, BigUint), SimulationError>;
1010        pub fn delta_transition(
1011            &mut self,
1012            delta: ProtocolStateDelta,
1013            tokens: &HashMap<Bytes, Token>,
1014            balances: &Balances,
1015        ) -> Result<(), TransitionError>;
1016        pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1017        pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1018    }
1019}
1020
1021#[cfg(test)]
1022crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1023
1024#[cfg(test)]
1025impl ProtocolSim for MockProtocolSim {
1026    fn fee(&self) -> f64 {
1027        self.fee()
1028    }
1029
1030    fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1031        self.spot_price(base, quote)
1032    }
1033
1034    fn get_amount_out(
1035        &self,
1036        amount_in: BigUint,
1037        token_in: &Token,
1038        token_out: &Token,
1039    ) -> Result<GetAmountOutResult, SimulationError> {
1040        self.get_amount_out(amount_in, token_in, token_out)
1041    }
1042
1043    fn get_limits(
1044        &self,
1045        sell_token: Bytes,
1046        buy_token: Bytes,
1047    ) -> Result<(BigUint, BigUint), SimulationError> {
1048        self.get_limits(sell_token, buy_token)
1049    }
1050
1051    fn delta_transition(
1052        &mut self,
1053        delta: ProtocolStateDelta,
1054        tokens: &HashMap<Bytes, Token>,
1055        balances: &Balances,
1056    ) -> Result<(), TransitionError> {
1057        self.delta_transition(delta, tokens, balances)
1058    }
1059
1060    fn clone_box(&self) -> Box<dyn ProtocolSim> {
1061        self.clone_box()
1062    }
1063
1064    fn as_any(&self) -> &dyn Any {
1065        panic!("MockProtocolSim does not support as_any")
1066    }
1067
1068    fn as_any_mut(&mut self) -> &mut dyn Any {
1069        panic!("MockProtocolSim does not support as_any_mut")
1070    }
1071
1072    fn eq(&self, other: &dyn ProtocolSim) -> bool {
1073        self.eq(other)
1074    }
1075
1076    fn typetag_name(&self) -> &'static str {
1077        unreachable!()
1078    }
1079
1080    fn typetag_deserialize(&self) {
1081        unreachable!()
1082    }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087    use std::{fs, path::Path, str::FromStr};
1088
1089    use alloy::primitives::address;
1090    use mockall::predicate::*;
1091    use rstest::*;
1092    use tycho_client::feed::BlockHeader;
1093    use tycho_common::{models::Chain, Bytes};
1094
1095    use super::*;
1096    use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1097
1098    async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1099        let mut decoder = TychoStreamDecoder::new();
1100        decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1101        if set_tokens {
1102            let tokens = [
1103                Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1104                Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1105            ]
1106            .iter()
1107            .map(|addr| {
1108                let addr_str = format!("{addr:x}");
1109                (
1110                    addr.clone(),
1111                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1112                )
1113            })
1114            .collect();
1115            decoder.set_tokens(tokens).await;
1116        }
1117        decoder
1118    }
1119
1120    fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1121        let project_root = env!("CARGO_MANIFEST_DIR");
1122        let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1123        let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1124        serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1125    }
1126
1127    #[tokio::test]
1128    async fn test_decode() {
1129        let decoder = setup_decoder(true).await;
1130
1131        let msg = load_test_msg("uniswap_v2_snapshot");
1132        let res1 = decoder
1133            .decode(&msg)
1134            .await
1135            .expect("decode failure");
1136        let msg = load_test_msg("uniswap_v2_delta");
1137        let res2 = decoder
1138            .decode(&msg)
1139            .await
1140            .expect("decode failure");
1141
1142        assert_eq!(res1.states.len(), 1);
1143        assert_eq!(res2.states.len(), 1);
1144        assert_eq!(res1.sync_states.len(), 1);
1145        assert_eq!(res2.sync_states.len(), 1);
1146    }
1147
1148    #[tokio::test]
1149    async fn test_decode_component_missing_token() {
1150        let decoder = setup_decoder(false).await;
1151        let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1152            .iter()
1153            .map(|addr| {
1154                let addr_str = format!("{addr:x}");
1155                (
1156                    addr.clone(),
1157                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1158                )
1159            })
1160            .collect();
1161        decoder.set_tokens(tokens).await;
1162
1163        let msg = load_test_msg("uniswap_v2_snapshot");
1164        let res1 = decoder
1165            .decode(&msg)
1166            .await
1167            .expect("decode failure");
1168
1169        assert_eq!(res1.states.len(), 0);
1170    }
1171
1172    #[tokio::test]
1173    async fn test_decode_component_bad_id() {
1174        let decoder = setup_decoder(true).await;
1175        let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1176
1177        match decoder.decode(&msg).await {
1178            Err(StreamDecodeError::Fatal(msg)) => {
1179                assert_eq!(msg, "Component id mismatch");
1180            }
1181            Ok(_) => {
1182                panic!("Expected failures to be raised")
1183            }
1184        }
1185    }
1186
1187    #[rstest]
1188    #[case(true)]
1189    #[case(false)]
1190    #[tokio::test]
1191    async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1192        let mut decoder = setup_decoder(true).await;
1193        decoder.skip_state_decode_failures = skip_failures;
1194
1195        let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1196        match decoder.decode(&msg).await {
1197            Err(StreamDecodeError::Fatal(msg)) => {
1198                if !skip_failures {
1199                    assert_eq!(msg, "Missing attributes reserve0");
1200                } else {
1201                    panic!("Expected failures to be ignored. Err: {msg}")
1202                }
1203            }
1204            Ok(res) => {
1205                if !skip_failures {
1206                    panic!("Expected failures to be raised")
1207                } else {
1208                    assert_eq!(res.states.len(), 0);
1209                }
1210            }
1211        }
1212    }
1213
1214    #[tokio::test]
1215    async fn test_decode_updates_state_on_contract_change() {
1216        let decoder = setup_decoder(true).await;
1217
1218        // Create the mock instances
1219        let mut mock_state = MockProtocolSim::new();
1220
1221        mock_state
1222            .expect_clone_box()
1223            .times(1)
1224            .returning(|| {
1225                let mut cloned_mock_state = MockProtocolSim::new();
1226                // Expect `delta_transition` to be called once with any parameters
1227                cloned_mock_state
1228                    .expect_delta_transition()
1229                    .times(1)
1230                    .returning(|_, _, _| Ok(()));
1231                cloned_mock_state
1232                    .expect_clone_box()
1233                    .times(1)
1234                    .returning(|| Box::new(MockProtocolSim::new()));
1235                Box::new(cloned_mock_state)
1236            });
1237
1238        // Insert mock state into `updated_states`
1239        let pool_id =
1240            "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1241        decoder
1242            .state
1243            .write()
1244            .await
1245            .states
1246            .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1247        decoder
1248            .state
1249            .write()
1250            .await
1251            .contracts_map
1252            .insert(
1253                Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1254                HashSet::from([pool_id.clone()]),
1255            );
1256
1257        // Load a test message containing a contract update
1258        let msg = load_test_msg("balancer_v2_delta");
1259
1260        // Decode the message
1261        let _ = decoder
1262            .decode(&msg)
1263            .await
1264            .expect("decode failure");
1265
1266        // The mock framework will assert that `delta_transition` was called exactly once
1267    }
1268
1269    #[test]
1270    fn test_generate_proxy_token_address() {
1271        let idx = 1;
1272        let generated_address =
1273            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1274        assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1275
1276        let idx = 123456;
1277        let generated_address =
1278            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1279        assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1280    }
1281
1282    #[tokio::test(flavor = "multi_thread")]
1283    async fn test_euler_hook_low_pool_manager_balance() {
1284        let mut decoder = TychoStreamDecoder::new();
1285
1286        decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1287            "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1288        );
1289
1290        let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1291        let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1292        let tokens = HashMap::from([
1293            (
1294                weth.clone(),
1295                Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1296            ),
1297            (
1298                teth.clone(),
1299                Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1300            ),
1301        ]);
1302
1303        decoder.set_tokens(tokens.clone()).await;
1304
1305        let msg = load_test_msg("euler_hook_snapshot");
1306        let res = decoder
1307            .decode(&msg)
1308            .await
1309            .expect("decode failure");
1310
1311        let pool_state = res
1312            .states
1313            .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1314            .expect("Couldn't find target pool");
1315        let amount_out = pool_state
1316            .get_amount_out(
1317                BigUint::from_str("1000000000000000000").unwrap(),
1318                tokens.get(&teth).unwrap(),
1319                tokens.get(&weth).unwrap(),
1320            )
1321            .expect("Get amount out failed");
1322
1323        assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1324    }
1325}