Skip to main content

tycho_simulation/evm/
decoder.rs

1use std::{
2    collections::{hash_map::Entry, HashMap, HashSet},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14    dto::{ChangeType, ProtocolStateDelta},
15    models::{token::Token, Chain},
16    simulation::protocol_sim::{Balances, ProtocolSim},
17    Bytes,
18};
19#[cfg(test)]
20use {
21    mockall::mock,
22    num_bigint::BigUint,
23    std::any::Any,
24    tycho_common::simulation::{
25        errors::{SimulationError, TransitionError},
26        protocol_sim::GetAmountOutResult,
27    },
28};
29
30use crate::{
31    evm::{
32        engine_db::{update_engine, SHARED_TYCHO_DB},
33        protocol::{
34            utils::bytes_to_address,
35            vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36        },
37        tycho_models::{AccountUpdate, ResponseAccount},
38    },
39    protocol::{
40        errors::InvalidSnapshotError,
41        models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42    },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47    #[error("{0}")]
48    Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53    tokens: HashMap<Bytes, Token>,
54    states: HashMap<String, Box<dyn ProtocolSim>>,
55    components: HashMap<String, ProtocolComponent>,
56    // maps contract address to the pools they affect
57    contracts_map: HashMap<Bytes, HashSet<String>>,
58    // Maps original token address to their new proxy token address
59    proxy_token_addresses: HashMap<Address, Address>,
60    // Set of failed components, these are components that failed to decode and will not be emitted
61    // again TODO: handle more gracefully inside tycho-client. We could fetch the snapshot and
62    // try to decode it again.
63    failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67    Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70    + Send
71    + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74/// A decoder to process raw messages.
75///
76/// This struct decodes incoming messages of type `FeedMessage` and converts it into the
77/// `BlockUpdate` struct.
78///
79/// # Important:
80/// - Supports registering exchanges and their associated filters for specific protocol components.
81/// - Allows the addition of client-side filters for custom conditions.
82///
83/// **Note:** The tokens provided during configuration will be used for decoding, ensuring
84/// efficient handling of protocol components. Protocol components containing tokens which are not
85/// included in this initial list, or added when applying deltas, will not be decoded.
86pub struct TychoStreamDecoder<H>
87where
88    H: HeaderLike,
89{
90    state: Arc<RwLock<DecoderState>>,
91    skip_state_decode_failures: bool,
92    min_token_quality: u32,
93    registry: HashMap<String, Box<RegistryFn<H>>>,
94    inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110    pub fn new() -> Self {
111        Self {
112            state: Arc::new(RwLock::new(DecoderState::default())),
113            skip_state_decode_failures: false,
114            min_token_quality: 100,
115            registry: HashMap::new(),
116            inclusion_filters: HashMap::new(),
117        }
118    }
119
120    /// Sets the currently known tokens which will be considered during decoding.
121    ///
122    /// Protocol components containing tokens which are not included in this initial list, or
123    /// added when applying deltas, will not be decoded.
124    pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125        let mut guard = self.state.write().await;
126        guard.tokens = tokens;
127    }
128
129    pub fn skip_state_decode_failures(&mut self, skip: bool) {
130        self.skip_state_decode_failures = skip;
131    }
132
133    /// Sets the minimum token quality for decoding.
134    ///
135    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
136    /// Set this to the same value used in [`load_all_tokens`] to apply consistent filtering.
137    pub fn min_token_quality(&mut self, quality: u32) {
138        self.min_token_quality = quality;
139    }
140
141    /// Registers a decoder for a given exchange with a decoder context.
142    ///
143    /// This method maps an exchange identifier to a specific protocol simulation type.
144    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
145    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
146    /// the component data into the appropriate protocol simulation type based on the current
147    /// blockchain state and the provided block header.
148    /// For example, to register a decoder for the `uniswap_v2` exchange with an additional decoder
149    /// context, you must call this function with
150    /// `register_decoder_with_context::<UniswapV2State>("uniswap_v2", context)`.
151    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
152    /// `UniswapV2State` decoder for use in the protocol stream.
153    pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
154    where
155        T: ProtocolSim
156            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
157            + Send
158            + 'static,
159    {
160        let decoder = Box::new(
161            move |component: ComponentWithState,
162                  header: H,
163                  account_balances: AccountBalances,
164                  state: Arc<RwLock<DecoderState>>| {
165                let context = context.clone();
166                Box::pin(async move {
167                    let guard = state.read().await;
168                    T::try_from_with_header(
169                        component,
170                        header,
171                        &account_balances,
172                        &guard.tokens,
173                        &context,
174                    )
175                    .await
176                    .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
177                }) as DecodeFut
178            },
179        );
180        self.registry
181            .insert(exchange.to_string(), decoder);
182    }
183
184    /// Registers a decoder for a given exchange.
185    ///
186    /// This method maps an exchange identifier to a specific protocol simulation type.
187    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
188    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
189    /// the component data into the appropriate protocol simulation type based on the current
190    /// blockchain state and the provided block header.
191    /// For example, to register a decoder for the `uniswap_v2` exchange, you must call
192    /// this function with `register_decoder::<UniswapV2State>("uniswap_v2", vm_attributes)`.
193    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
194    /// `UniswapV2State` decoder for use in the protocol stream.
195    pub fn register_decoder<T>(&mut self, exchange: &str)
196    where
197        T: ProtocolSim
198            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
199            + Send
200            + 'static,
201    {
202        let context = DecoderContext::new();
203        self.register_decoder_with_context::<T>(exchange, context);
204    }
205
206    /// Registers a client-side filter function for a given exchange.
207    ///
208    /// Associates a filter function with an exchange ID, enabling custom filtering of protocol
209    /// components. The filter function is applied client-side to refine the data received from the
210    /// stream. It can be used to exclude certain components based on attributes or conditions that
211    /// are not supported by the server-side filtering logic. This is particularly useful for
212    /// implementing custom behaviors, such as:
213    /// - Filtering out pools with specific attributes (e.g., unsupported features).
214    /// - Blacklisting pools based on custom criteria.
215    /// - Excluding pools that do not meet certain requirements (e.g., token pairs or liquidity
216    ///   constraints).
217    ///
218    /// For example, you might use a filter to exclude pools that are not fully supported in the
219    /// protocol, or to ignore pools with certain attributes that are irrelevant to your
220    /// application.
221    pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
222        self.inclusion_filters
223            .insert(exchange.to_string(), predicate);
224    }
225
226    /// Decodes a `FeedMessage` into a `BlockUpdate` containing the updated states of protocol
227    /// components
228    pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
229        // stores all states updated in this tick/msg
230        let mut updated_states = HashMap::new();
231        let mut new_pairs = HashMap::new();
232        let mut removed_pairs = HashMap::new();
233        let mut contracts_map = HashMap::new();
234        let mut msg_failed_components = HashSet::new();
235
236        let header = msg
237            .state_msgs
238            .values()
239            .next()
240            .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
241            .header
242            .clone();
243
244        let block_number_or_timestamp = header
245            .clone()
246            .block_number_or_timestamp();
247        let current_block = header.clone().block();
248
249        for (protocol, protocol_msg) in msg.state_msgs.iter() {
250            // Add any new tokens
251            if let Some(deltas) = protocol_msg.deltas.as_ref() {
252                let mut state_guard = self.state.write().await;
253
254                let new_tokens = deltas
255                    .new_tokens
256                    .iter()
257                    .filter(|(addr, t)| {
258                        t.quality >= self.min_token_quality &&
259                            !state_guard.tokens.contains_key(*addr)
260                    })
261                    .filter_map(|(addr, t)| {
262                        t.clone()
263                            .try_into()
264                            .map(|token| (addr.clone(), token))
265                            .inspect_err(|e| {
266                                warn!("Failed decoding token {e:?} {addr:#044x}");
267                                *e
268                            })
269                            .ok()
270                    })
271                    .collect::<HashMap<Bytes, Token>>();
272
273                if !new_tokens.is_empty() {
274                    debug!(n = new_tokens.len(), "NewTokens");
275                    state_guard.tokens.extend(new_tokens);
276                }
277            }
278
279            // Remove untracked components
280            {
281                let mut state_guard = self.state.write().await;
282                let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
283                    .removed_components
284                    .iter()
285                    .map(|(id, comp)| {
286                        if *id != comp.id {
287                            error!(
288                                "Component id mismatch in removed components {id} != {}",
289                                comp.id
290                            );
291                            return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
292                        }
293
294                        let tokens = comp
295                            .tokens
296                            .iter()
297                            .flat_map(|addr| state_guard.tokens.get(addr).cloned())
298                            .collect::<Vec<_>>();
299
300                        if tokens.len() == comp.tokens.len() {
301                            Ok(Some((
302                                id.clone(),
303                                ProtocolComponent::from_with_tokens(comp.clone(), tokens),
304                            )))
305                        } else {
306                            Ok(None)
307                        }
308                    })
309                    .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
310                    )?
311                    .into_iter()
312                    .flatten()
313                    .collect();
314
315                // Remove components from state and add to removed_pairs
316                for (id, component) in removed_components {
317                    state_guard.components.remove(&id);
318                    state_guard.states.remove(&id);
319                    removed_pairs.insert(id, component);
320                }
321
322                // UPDATE VM STORAGE
323                info!(
324                    "Processing {} contracts from snapshots",
325                    protocol_msg
326                        .snapshots
327                        .get_vm_storage()
328                        .len()
329                );
330
331                let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
332                let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
333                for (key, value) in protocol_msg
334                    .snapshots
335                    .get_vm_storage()
336                    .iter()
337                {
338                    let account: ResponseAccount = value.clone().into();
339
340                    if state_guard.tokens.contains_key(key) {
341                        let original_address = account.address;
342                        // To work with Tycho's token overwrites system, if we get account
343                        // snapshots for a token we must handle them with a proxy/wrapper
344                        // contract.
345                        // Note: storage for the original contract must be set at the proxy
346                        // contract address. This is because the proxy contract uses
347                        // delegatecall to the original (implementation) contract.
348
349                        // Handle proxy token accounts
350                        let (impl_addr, proxy_state) = match state_guard
351                            .proxy_token_addresses
352                            .get(&original_address)
353                        {
354                            Some(impl_addr) => {
355                                // Token already has a proxy contract, simply update it.
356
357                                // Note: we apply the snapshot as an update. This is to cover the
358                                // case where a contract may be stale as it stopped being tracked
359                                // for some reason (e.g. due to a drop in tvl) and is now being
360                                // tracked again.
361                                let proxy_state = AccountUpdate::new(
362                                    original_address,
363                                    value.chain.into(),
364                                    account.slots.clone(),
365                                    Some(account.native_balance),
366                                    None,
367                                    ChangeType::Update,
368                                );
369                                (*impl_addr, proxy_state)
370                            }
371                            None => {
372                                // Token does not have a proxy contract yet, create one
373
374                                // Assign original token contract to new address
375                                let impl_addr = generate_proxy_token_address(
376                                    state_guard.proxy_token_addresses.len() as u32,
377                                )?;
378                                state_guard
379                                    .proxy_token_addresses
380                                    .insert(original_address, impl_addr);
381
382                                // Add proxy token contract at original token address
383                                let proxy_state = create_proxy_token_account(
384                                    original_address,
385                                    Some(impl_addr),
386                                    &account.slots,
387                                    value.chain.into(),
388                                    Some(account.native_balance),
389                                );
390
391                                (impl_addr, proxy_state)
392                            }
393                        };
394
395                        proxy_token_accounts.insert(original_address, proxy_state);
396
397                        // Assign original token contract to the implementation address
398                        let impl_update = ResponseAccount {
399                            address: impl_addr,
400                            slots: HashMap::new(),
401                            ..account.clone()
402                        };
403                        storage_by_address.insert(impl_addr, impl_update);
404                    } else {
405                        // Not a token, apply snapshot to the account at its original address
406                        storage_by_address.insert(account.address, account);
407                    }
408                }
409
410                // Split proxy accounts by change type:
411                // - Creation: new proxies that must overwrite any existing placeholder
412                // - Update: existing proxies whose storage is being refreshed (handled normally)
413                let mut proxy_creates: Vec<AccountUpdate> = Vec::new();
414                let mut proxy_updates: HashMap<Address, AccountUpdate> = HashMap::new();
415                for (addr, update) in proxy_token_accounts {
416                    if matches!(update.change, ChangeType::Creation) {
417                        proxy_creates.push(update);
418                    } else {
419                        proxy_updates.insert(addr, update);
420                    }
421                }
422
423                info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
424                update_engine(
425                    SHARED_TYCHO_DB.clone(),
426                    header.clone().block(),
427                    Some(storage_by_address),
428                    proxy_updates,
429                )
430                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
431
432                // Force-overwrite new proxy token accounts so that authoritative vm_storage data
433                // always wins over any empty placeholder previously inserted by another
434                // decoder's snapshot loop (which uses init_account / init-if-not-exists).
435                if !proxy_creates.is_empty() {
436                    SHARED_TYCHO_DB
437                        .force_update_accounts(proxy_creates)
438                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
439                }
440                info!("Engine updated");
441                drop(state_guard);
442            }
443
444            // Construct a contract to token balances map: HashMap<ContractAddress,
445            // HashMap<TokenAddress, Balance>>
446            let account_balances = protocol_msg
447                .clone()
448                .snapshots
449                .get_vm_storage()
450                .iter()
451                .filter_map(|(addr, acc)| {
452                    let balances = acc.token_balances.clone();
453                    if balances.is_empty() {
454                        return None;
455                    }
456                    Some((addr.clone(), balances))
457                })
458                .collect::<AccountBalances>();
459
460            let mut new_components = HashMap::new();
461            let mut count_token_skips = 0;
462            let mut components_to_store = HashMap::new();
463            {
464                let state_guard = self.state.read().await;
465
466                // PROCESS SNAPSHOTS
467                'snapshot_loop: for (id, snapshot) in protocol_msg
468                    .snapshots
469                    .get_states()
470                    .clone()
471                {
472                    // Skip any unsupported pools
473                    if self
474                        .inclusion_filters
475                        .get(protocol.as_str())
476                        .is_some_and(|predicate| !predicate(&snapshot))
477                    {
478                        continue;
479                    }
480
481                    // Construct component from snapshot
482                    let mut component_tokens = Vec::new();
483                    let mut new_tokens_accounts = HashMap::new();
484                    for token in snapshot.component.tokens.clone() {
485                        match state_guard.tokens.get(&token) {
486                            Some(token) => {
487                                component_tokens.push(token.clone());
488
489                                // If the token is not an existing proxy token, we need to add it to
490                                // the simulation engine
491                                let token_address = match bytes_to_address(&token.address) {
492                                    Ok(addr) => addr,
493                                    Err(_) => {
494                                        count_token_skips += 1;
495                                        msg_failed_components.insert(id.clone());
496                                        warn!(
497                                            "Token address could not be decoded {}, ignoring pool {:x?}",
498                                            token.address, id
499                                        );
500                                        continue 'snapshot_loop;
501                                    }
502                                };
503                                // Deploy a proxy account without an implementation set
504                                if !state_guard
505                                    .proxy_token_addresses
506                                    .contains_key(&token_address)
507                                {
508                                    new_tokens_accounts.insert(
509                                        token_address,
510                                        create_proxy_token_account(
511                                            token_address,
512                                            None,
513                                            &HashMap::new(),
514                                            snapshot.component.chain.into(),
515                                            None,
516                                        ),
517                                    );
518                                }
519                            }
520                            None => {
521                                count_token_skips += 1;
522                                msg_failed_components.insert(id.clone());
523                                debug!("Token not found {}, ignoring pool {:x?}", token, id);
524                                continue 'snapshot_loop;
525                            }
526                        }
527                    }
528                    let component = ProtocolComponent::from_with_tokens(
529                        snapshot.component.clone(),
530                        component_tokens,
531                    );
532
533                    // Add new tokens to the simulation engine
534                    if !new_tokens_accounts.is_empty() {
535                        update_engine(
536                            SHARED_TYCHO_DB.clone(),
537                            header.clone().block(),
538                            None,
539                            new_tokens_accounts,
540                        )
541                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
542                    }
543
544                    // collect contracts:ids mapping for states that should update on contract
545                    // changes (non-manual updates)
546                    if !component
547                        .static_attributes
548                        .contains_key("manual_updates")
549                    {
550                        for contract in &component.contract_ids {
551                            contracts_map
552                                .entry(contract.clone())
553                                .or_insert_with(HashSet::new)
554                                .insert(id.clone());
555                        }
556                        // Add DCI contracts so changes to these contracts trigger
557                        // an update
558                        for (_, tracing) in snapshot.entrypoints.iter() {
559                            for contract in tracing.accessed_slots.keys().cloned() {
560                                contracts_map
561                                    .entry(contract)
562                                    .or_insert_with(HashSet::new)
563                                    .insert(id.clone());
564                            }
565                        }
566                    }
567
568                    // Collect new pairs (components)
569                    new_pairs.insert(id.clone(), component.clone());
570
571                    // Store component for later batch insertion
572                    components_to_store.insert(id.clone(), component);
573
574                    // Construct state from snapshot
575                    if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
576                        match state_decode_f(
577                            snapshot,
578                            header.clone(),
579                            account_balances.clone(),
580                            self.state.clone(),
581                        )
582                        .await
583                        {
584                            Ok(state) => {
585                                new_components.insert(id.clone(), state);
586                            }
587                            Err(e) => {
588                                if self.skip_state_decode_failures {
589                                    warn!(pool = id, error = %e, "StateDecodingFailure");
590                                    msg_failed_components.insert(id.clone());
591                                    continue 'snapshot_loop;
592                                } else {
593                                    error!(pool = id, error = %e, "StateDecodingFailure");
594                                    return Err(StreamDecodeError::Fatal(format!("{e}")));
595                                }
596                            }
597                        }
598                    } else if self.skip_state_decode_failures {
599                        warn!(pool = id, "MissingDecoderRegistration");
600                        msg_failed_components.insert(id.clone());
601                        continue 'snapshot_loop;
602                    } else {
603                        error!(pool = id, "MissingDecoderRegistration");
604                        return Err(StreamDecodeError::Fatal(format!(
605                            "Missing decoder registration for: {id}"
606                        )));
607                    }
608                }
609            }
610
611            // Batch insert components into state
612            if !components_to_store.is_empty() {
613                let mut state_guard = self.state.write().await;
614                for (id, component) in components_to_store {
615                    state_guard
616                        .components
617                        .insert(id, component);
618                }
619            }
620
621            if !protocol_msg.snapshots.states.is_empty() {
622                info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
623            }
624            if count_token_skips > 0 {
625                info!("Skipped {count_token_skips} pools due to missing tokens");
626            }
627
628            //TODO: should we remove failed components for new_components?
629            updated_states.extend(new_components);
630
631            // PROCESS DELTAS
632            if let Some(deltas) = protocol_msg.deltas.clone() {
633                // Update engine with account changes
634                let mut state_guard = self.state.write().await;
635
636                let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
637                // New proxy token accounts that must overwrite any existing placeholder.
638                let mut new_proxy_accounts: Vec<AccountUpdate> = Vec::new();
639                for (key, value) in deltas.account_updates.iter() {
640                    let mut update: AccountUpdate = value.clone().into();
641
642                    // TEMP PATCH (ENG-4993)
643                    //
644                    // The indexer emits deltas without code marked as creations, which crashes
645                    // TychoDB. Until fixed, treat them as updates (since EVM code cannot be
646                    // deleted).
647                    if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
648                        error!(
649                            update = ?update,
650                            "FaultyCreationDelta"
651                        );
652                        update.change = ChangeType::Update;
653                    }
654
655                    if state_guard.tokens.contains_key(key) {
656                        let original_address = update.address;
657                        // If the account is a token, we need to handle it with a proxy contract.
658                        // Storage updates apply to the proxy contract (at original address).
659                        // Code updates (if any) apply to the token implementation contract (at
660                        // impl_addr).
661
662                        // Handle proxy contract updates
663                        let impl_addr = match state_guard
664                            .proxy_token_addresses
665                            .get(&original_address)
666                        {
667                            Some(impl_addr) => {
668                                // Token already has a proxy contract.
669
670                                // Apply the storage update to proxy contract
671                                let proxy_update = AccountUpdate { code: None, ..update.clone() };
672                                account_update_by_address.insert(original_address, proxy_update);
673
674                                *impl_addr
675                            }
676                            None => {
677                                // Token does not have a proxy contract yet, create one
678
679                                // Assign original token (implementation) contract to new proxy
680                                // address
681                                let impl_addr = generate_proxy_token_address(
682                                    state_guard.proxy_token_addresses.len() as u32,
683                                )?;
684                                state_guard
685                                    .proxy_token_addresses
686                                    .insert(original_address, impl_addr);
687
688                                // Create proxy token account with original account's storage (at
689                                // original address). Track it separately so it can be
690                                // force-overwritten and win over any placeholder that another
691                                // decoder's snapshot loop may have written earlier.
692                                let proxy_state = create_proxy_token_account(
693                                    original_address,
694                                    Some(impl_addr),
695                                    &update.slots,
696                                    update.chain,
697                                    update.balance,
698                                );
699                                new_proxy_accounts.push(proxy_state);
700
701                                impl_addr
702                            }
703                        };
704
705                        // Apply code update to token implementation contract
706                        if update.code.is_some() {
707                            let impl_update = AccountUpdate {
708                                address: impl_addr,
709                                slots: HashMap::new(),
710                                ..update.clone()
711                            };
712                            account_update_by_address.insert(impl_addr, impl_update);
713                        }
714                    } else {
715                        // Not a token, apply update to the account at its original address
716                        account_update_by_address.insert(update.address, update);
717                    }
718                }
719                drop(state_guard);
720
721                let state_guard = self.state.read().await;
722                info!("Updating engine with {} contract deltas", deltas.account_updates.len());
723                update_engine(
724                    SHARED_TYCHO_DB.clone(),
725                    header.clone().block(),
726                    None,
727                    account_update_by_address,
728                )
729                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
730
731                // Force-overwrite any newly-created proxy token accounts so they always win
732                // over placeholder entries inserted by other decoders' snapshot loops.
733                if !new_proxy_accounts.is_empty() {
734                    SHARED_TYCHO_DB
735                        .force_update_accounts(new_proxy_accounts)
736                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
737                }
738                info!("Engine updated");
739
740                // Collect all pools related to the updated accounts
741                let mut pools_to_update = HashSet::new();
742                for (account, _update) in deltas.account_updates {
743                    // get new pools related to the account updated
744                    pools_to_update.extend(
745                        contracts_map
746                            .get(&account)
747                            .cloned()
748                            .unwrap_or_default(),
749                    );
750                    // get existing pools related to the account updated
751                    pools_to_update.extend(
752                        state_guard
753                            .contracts_map
754                            .get(&account)
755                            .cloned()
756                            .unwrap_or_default(),
757                    );
758                }
759
760                // Collect all balance changes this block
761                let all_balances = Balances {
762                    component_balances: deltas
763                        .component_balances
764                        .iter()
765                        .map(|(pool_id, bals)| {
766                            let mut balances = HashMap::new();
767                            for (t, b) in &bals.0 {
768                                balances.insert(t.clone(), b.balance.clone());
769                            }
770                            pools_to_update.insert(pool_id.clone());
771                            (pool_id.clone(), balances)
772                        })
773                        .collect(),
774                    account_balances: deltas
775                        .account_balances
776                        .iter()
777                        .map(|(account, bals)| {
778                            let mut balances = HashMap::new();
779                            for (t, b) in bals {
780                                balances.insert(t.clone(), b.balance.clone());
781                            }
782                            pools_to_update.extend(
783                                contracts_map
784                                    .get(account)
785                                    .cloned()
786                                    .unwrap_or_default(),
787                            );
788                            (account.clone(), balances)
789                        })
790                        .collect(),
791                };
792
793                // update states with protocol state deltas (attribute changes etc.)
794                for (id, update) in deltas.state_updates {
795                    // TODO: is this needed?
796                    let update_with_block =
797                        Self::add_block_info_to_delta(update, current_block.clone());
798                    match Self::apply_update(
799                        &id,
800                        update_with_block,
801                        &mut updated_states,
802                        &state_guard,
803                        &all_balances,
804                    ) {
805                        Ok(_) => {
806                            pools_to_update.remove(&id);
807                        }
808                        Err(e) => {
809                            if self.skip_state_decode_failures {
810                                warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
811                                // Remove from updated_states if it was there
812                                updated_states.remove(&id);
813                                // Try to get component from new_pairs first, then from state
814                                if let Some(component) = new_pairs.remove(&id) {
815                                    removed_pairs.insert(id.clone(), component);
816                                } else if let Some(component) = state_guard.components.get(&id) {
817                                    removed_pairs.insert(id.clone(), component.clone());
818                                } else {
819                                    // Component not found in new_pairs or state, this shouldn't
820                                    // happen
821                                    warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
822                                }
823                                pools_to_update.remove(&id);
824
825                                // Add to failed components
826                                msg_failed_components.insert(id.clone());
827                            } else {
828                                return Err(e);
829                            }
830                        }
831                    }
832                }
833
834                // update remaining pools linked to updated contracts/updated balances
835                for pool in pools_to_update {
836                    // TODO: is this needed?
837                    let default_delta_with_block = Self::add_block_info_to_delta(
838                        ProtocolStateDelta::default(),
839                        current_block.clone(),
840                    );
841                    match Self::apply_update(
842                        &pool,
843                        default_delta_with_block,
844                        &mut updated_states,
845                        &state_guard,
846                        &all_balances,
847                    ) {
848                        Ok(_) => {}
849                        Err(e) => {
850                            if self.skip_state_decode_failures {
851                                warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
852                                // Remove from updated_states if it was there
853                                updated_states.remove(&pool);
854                                // Try to get component from new_pairs first, then from state
855                                if let Some(component) = new_pairs.remove(&pool) {
856                                    removed_pairs.insert(pool.clone(), component);
857                                } else if let Some(component) = state_guard.components.get(&pool) {
858                                    removed_pairs.insert(pool.clone(), component.clone());
859                                } else {
860                                    // Component not found in new_pairs or state, this shouldn't
861                                    // happen
862                                    warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
863                                }
864
865                                // Add to failed components
866                                msg_failed_components.insert(pool.clone());
867                            } else {
868                                return Err(e);
869                            }
870                        }
871                    }
872                }
873            };
874        }
875
876        // Persist the newly added/updated states
877        let mut state_guard = self.state.write().await;
878
879        // Update failed components with any new ones
880        state_guard
881            .failed_components
882            .extend(msg_failed_components);
883
884        // Remove any failed components from Updates
885        // Perf: we could do it directly in the decoder logic to avoid some steps, but this logic is
886        // complex and this is more robust.
887        updated_states.retain(|id, _| {
888            !state_guard
889                .failed_components
890                .contains(id)
891        });
892        new_pairs.retain(|id, _| {
893            !state_guard
894                .failed_components
895                .contains(id)
896        });
897
898        state_guard
899            .states
900            .extend(updated_states.clone().into_iter());
901
902        // Add new components to persistent state
903        for (id, component) in new_pairs.iter() {
904            state_guard
905                .components
906                .insert(id.clone(), component.clone());
907        }
908
909        // Remove components from persistent state
910        for (id, _) in removed_pairs.iter() {
911            state_guard.components.remove(id);
912        }
913
914        for (key, values) in contracts_map {
915            state_guard
916                .contracts_map
917                .entry(key)
918                .or_insert_with(HashSet::new)
919                .extend(values);
920        }
921
922        // Send the tick with all updated states
923        Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
924            .set_removed_pairs(removed_pairs)
925            .set_sync_states(msg.sync_states.clone()))
926    }
927
928    /// Add block information (number and timestamp) to a ProtocolStateDelta
929    fn add_block_info_to_delta(
930        mut delta: ProtocolStateDelta,
931        block_header_opt: Option<BlockHeader>,
932    ) -> ProtocolStateDelta {
933        if let Some(header) = block_header_opt {
934            // Add block_number and block_timestamp attributes to ensure pool states
935            // receive current block information during delta_transition
936            delta.updated_attributes.insert(
937                "block_number".to_string(),
938                Bytes::from(header.number.to_be_bytes().to_vec()),
939            );
940            delta.updated_attributes.insert(
941                "block_timestamp".to_string(),
942                Bytes::from(header.timestamp.to_be_bytes().to_vec()),
943            );
944        }
945        delta
946    }
947
948    fn apply_update(
949        id: &String,
950        update: ProtocolStateDelta,
951        updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
952        state_guard: &RwLockReadGuard<'_, DecoderState>,
953        all_balances: &Balances,
954    ) -> Result<(), StreamDecodeError> {
955        match updated_states.entry(id.clone()) {
956            Entry::Occupied(mut entry) => {
957                // If state exists in updated_states, apply the delta to it
958                let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
959                state
960                    .delta_transition(update, &state_guard.tokens, all_balances)
961                    .map_err(|e| {
962                        error!(pool = id, error = ?e, "DeltaTransitionError");
963                        StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
964                    })?;
965            }
966            Entry::Vacant(_) => {
967                match state_guard.states.get(id) {
968                    // If state does not exist in updated_states, apply the delta to the stored
969                    // state
970                    Some(stored_state) => {
971                        let mut state = stored_state.clone();
972                        state
973                            .delta_transition(update, &state_guard.tokens, all_balances)
974                            .map_err(|e| {
975                                error!(pool = id, error = ?e, "DeltaTransitionError");
976                                StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
977                            })?;
978                        updated_states.insert(id.clone(), state);
979                    }
980                    None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
981                }
982            }
983        }
984        Ok(())
985    }
986}
987
988/// Generate a proxy token address for a given token index
989fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
990    let padded_idx = format!("{idx:x}");
991    let padded_zeroes = "0".repeat(33 - padded_idx.len());
992    let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
993    let decoded = hex::decode(proxy_token_address).map_err(|e| {
994        StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
995    })?;
996
997    const ADDRESS_LENGTH: usize = 20;
998    if decoded.len() != ADDRESS_LENGTH {
999        return Err(StreamDecodeError::Fatal(format!(
1000            "Invalid proxy token address length: expected {}, got {}",
1001            ADDRESS_LENGTH,
1002            decoded.len(),
1003        )));
1004    }
1005
1006    Ok(Address::from_slice(&decoded))
1007}
1008
1009/// Create a proxy token account for a token at a given address
1010///
1011/// The proxy token account is created at the original token address and points to the new token
1012/// address.
1013fn create_proxy_token_account(
1014    addr: Address,
1015    new_address: Option<Address>,
1016    storage: &HashMap<U256, U256>,
1017    chain: Chain,
1018    balance: Option<U256>,
1019) -> AccountUpdate {
1020    let mut slots = storage.clone();
1021    if let Some(new_address) = new_address {
1022        slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
1023    }
1024
1025    AccountUpdate {
1026        address: addr,
1027        chain,
1028        slots,
1029        balance,
1030        code: Some(ERC20_PROXY_BYTECODE.to_vec()),
1031        change: ChangeType::Creation,
1032    }
1033}
1034
1035#[cfg(test)]
1036mock! {
1037    #[derive(Debug)]
1038    pub ProtocolSim {
1039        pub fn fee(&self) -> f64;
1040        pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
1041        pub fn get_amount_out(
1042            &self,
1043            amount_in: BigUint,
1044            token_in: &Token,
1045            token_out: &Token,
1046        ) -> Result<GetAmountOutResult, SimulationError>;
1047        pub fn get_limits(
1048            &self,
1049            sell_token: Bytes,
1050            buy_token: Bytes,
1051        ) -> Result<(BigUint, BigUint), SimulationError>;
1052        pub fn delta_transition(
1053            &mut self,
1054            delta: ProtocolStateDelta,
1055            tokens: &HashMap<Bytes, Token>,
1056            balances: &Balances,
1057        ) -> Result<(), TransitionError>;
1058        pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1059        pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1060    }
1061}
1062
1063#[cfg(test)]
1064crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1065
1066#[cfg(test)]
1067impl ProtocolSim for MockProtocolSim {
1068    fn fee(&self) -> f64 {
1069        self.fee()
1070    }
1071
1072    fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1073        self.spot_price(base, quote)
1074    }
1075
1076    fn get_amount_out(
1077        &self,
1078        amount_in: BigUint,
1079        token_in: &Token,
1080        token_out: &Token,
1081    ) -> Result<GetAmountOutResult, SimulationError> {
1082        self.get_amount_out(amount_in, token_in, token_out)
1083    }
1084
1085    fn get_limits(
1086        &self,
1087        sell_token: Bytes,
1088        buy_token: Bytes,
1089    ) -> Result<(BigUint, BigUint), SimulationError> {
1090        self.get_limits(sell_token, buy_token)
1091    }
1092
1093    fn delta_transition(
1094        &mut self,
1095        delta: ProtocolStateDelta,
1096        tokens: &HashMap<Bytes, Token>,
1097        balances: &Balances,
1098    ) -> Result<(), TransitionError> {
1099        self.delta_transition(delta, tokens, balances)
1100    }
1101
1102    fn clone_box(&self) -> Box<dyn ProtocolSim> {
1103        self.clone_box()
1104    }
1105
1106    fn as_any(&self) -> &dyn Any {
1107        panic!("MockProtocolSim does not support as_any")
1108    }
1109
1110    fn as_any_mut(&mut self) -> &mut dyn Any {
1111        panic!("MockProtocolSim does not support as_any_mut")
1112    }
1113
1114    fn eq(&self, other: &dyn ProtocolSim) -> bool {
1115        self.eq(other)
1116    }
1117
1118    fn typetag_name(&self) -> &'static str {
1119        unreachable!()
1120    }
1121
1122    fn typetag_deserialize(&self) {
1123        unreachable!()
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use std::{fs, path::Path, str::FromStr};
1130
1131    use alloy::primitives::address;
1132    use mockall::predicate::*;
1133    use rstest::*;
1134    use tycho_client::feed::BlockHeader;
1135    use tycho_common::{models::Chain, Bytes};
1136
1137    use super::*;
1138    use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1139
1140    async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1141        let mut decoder = TychoStreamDecoder::new();
1142        decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1143        if set_tokens {
1144            let tokens = [
1145                Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1146                Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1147            ]
1148            .iter()
1149            .map(|addr| {
1150                let addr_str = format!("{addr:x}");
1151                (
1152                    addr.clone(),
1153                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1154                )
1155            })
1156            .collect();
1157            decoder.set_tokens(tokens).await;
1158        }
1159        decoder
1160    }
1161
1162    fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1163        let project_root = env!("CARGO_MANIFEST_DIR");
1164        let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1165        let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1166        serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1167    }
1168
1169    #[tokio::test]
1170    async fn test_decode() {
1171        let decoder = setup_decoder(true).await;
1172
1173        let msg = load_test_msg("uniswap_v2_snapshot");
1174        let res1 = decoder
1175            .decode(&msg)
1176            .await
1177            .expect("decode failure");
1178        let msg = load_test_msg("uniswap_v2_delta");
1179        let res2 = decoder
1180            .decode(&msg)
1181            .await
1182            .expect("decode failure");
1183
1184        assert_eq!(res1.states.len(), 1);
1185        assert_eq!(res2.states.len(), 1);
1186        assert_eq!(res1.sync_states.len(), 1);
1187        assert_eq!(res2.sync_states.len(), 1);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_decode_component_missing_token() {
1192        let decoder = setup_decoder(false).await;
1193        let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1194            .iter()
1195            .map(|addr| {
1196                let addr_str = format!("{addr:x}");
1197                (
1198                    addr.clone(),
1199                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1200                )
1201            })
1202            .collect();
1203        decoder.set_tokens(tokens).await;
1204
1205        let msg = load_test_msg("uniswap_v2_snapshot");
1206        let res1 = decoder
1207            .decode(&msg)
1208            .await
1209            .expect("decode failure");
1210
1211        assert_eq!(res1.states.len(), 0);
1212    }
1213
1214    #[tokio::test]
1215    async fn test_decode_component_bad_id() {
1216        let decoder = setup_decoder(true).await;
1217        let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1218
1219        match decoder.decode(&msg).await {
1220            Err(StreamDecodeError::Fatal(msg)) => {
1221                assert_eq!(msg, "Component id mismatch");
1222            }
1223            Ok(_) => {
1224                panic!("Expected failures to be raised")
1225            }
1226        }
1227    }
1228
1229    #[rstest]
1230    #[case(true)]
1231    #[case(false)]
1232    #[tokio::test]
1233    async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1234        let mut decoder = setup_decoder(true).await;
1235        decoder.skip_state_decode_failures = skip_failures;
1236
1237        let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1238        match decoder.decode(&msg).await {
1239            Err(StreamDecodeError::Fatal(msg)) => {
1240                if !skip_failures {
1241                    assert_eq!(msg, "Missing attributes reserve0");
1242                } else {
1243                    panic!("Expected failures to be ignored. Err: {msg}")
1244                }
1245            }
1246            Ok(res) => {
1247                if !skip_failures {
1248                    panic!("Expected failures to be raised")
1249                } else {
1250                    assert_eq!(res.states.len(), 0);
1251                }
1252            }
1253        }
1254    }
1255
1256    #[tokio::test]
1257    async fn test_decode_updates_state_on_contract_change() {
1258        let decoder = setup_decoder(true).await;
1259
1260        // Create the mock instances
1261        let mut mock_state = MockProtocolSim::new();
1262
1263        mock_state
1264            .expect_clone_box()
1265            .times(1)
1266            .returning(|| {
1267                let mut cloned_mock_state = MockProtocolSim::new();
1268                // Expect `delta_transition` to be called once with any parameters
1269                cloned_mock_state
1270                    .expect_delta_transition()
1271                    .times(1)
1272                    .returning(|_, _, _| Ok(()));
1273                cloned_mock_state
1274                    .expect_clone_box()
1275                    .times(1)
1276                    .returning(|| Box::new(MockProtocolSim::new()));
1277                Box::new(cloned_mock_state)
1278            });
1279
1280        // Insert mock state into `updated_states`
1281        let pool_id =
1282            "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1283        decoder
1284            .state
1285            .write()
1286            .await
1287            .states
1288            .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1289        decoder
1290            .state
1291            .write()
1292            .await
1293            .contracts_map
1294            .insert(
1295                Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1296                HashSet::from([pool_id.clone()]),
1297            );
1298
1299        // Load a test message containing a contract update
1300        let msg = load_test_msg("balancer_v2_delta");
1301
1302        // Decode the message
1303        let _ = decoder
1304            .decode(&msg)
1305            .await
1306            .expect("decode failure");
1307
1308        // The mock framework will assert that `delta_transition` was called exactly once
1309    }
1310
1311    #[test]
1312    fn test_generate_proxy_token_address() {
1313        let idx = 1;
1314        let generated_address =
1315            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1316        assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1317
1318        let idx = 123456;
1319        let generated_address =
1320            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1321        assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1322    }
1323
1324    #[tokio::test(flavor = "multi_thread")]
1325    async fn test_euler_hook_low_pool_manager_balance() {
1326        let mut decoder = TychoStreamDecoder::new();
1327
1328        decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1329            "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1330        );
1331
1332        let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1333        let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1334        let tokens = HashMap::from([
1335            (
1336                weth.clone(),
1337                Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1338            ),
1339            (
1340                teth.clone(),
1341                Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1342            ),
1343        ]);
1344
1345        decoder.set_tokens(tokens.clone()).await;
1346
1347        let msg = load_test_msg("euler_hook_snapshot");
1348        let res = decoder
1349            .decode(&msg)
1350            .await
1351            .expect("decode failure");
1352
1353        let pool_state = res
1354            .states
1355            .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1356            .expect("Couldn't find target pool");
1357        let amount_out = pool_state
1358            .get_amount_out(
1359                BigUint::from_str("1000000000000000000").unwrap(),
1360                tokens.get(&teth).unwrap(),
1361                tokens.get(&weth).unwrap(),
1362            )
1363            .expect("Get amount out failed");
1364
1365        assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1366    }
1367}