Skip to main content

tycho_simulation/evm/
decoder.rs

1use std::{
2    collections::{hash_map::Entry, HashMap, HashSet},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14    dto::{ChangeType, ProtocolStateDelta},
15    models::{token::Token, Chain},
16    simulation::protocol_sim::{Balances, ProtocolSim},
17    Bytes,
18};
19#[cfg(test)]
20use {
21    mockall::mock,
22    num_bigint::BigUint,
23    std::any::Any,
24    tycho_common::simulation::{
25        errors::{SimulationError, TransitionError},
26        protocol_sim::GetAmountOutResult,
27    },
28};
29
30use crate::{
31    evm::{
32        engine_db::{update_engine, SHARED_TYCHO_DB},
33        protocol::{
34            utils::bytes_to_address,
35            vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36        },
37        tycho_models::{AccountUpdate, ResponseAccount},
38    },
39    protocol::{
40        errors::InvalidSnapshotError,
41        models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42    },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47    #[error("{0}")]
48    Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53    tokens: HashMap<Bytes, Token>,
54    states: HashMap<String, Box<dyn ProtocolSim>>,
55    components: HashMap<String, ProtocolComponent>,
56    // maps contract address to the pools they affect
57    contracts_map: HashMap<Bytes, HashSet<String>>,
58    // Maps original token address to their new proxy token address
59    proxy_token_addresses: HashMap<Address, Address>,
60    // Set of failed components, these are components that failed to decode and will not be emitted
61    // again TODO: handle more gracefully inside tycho-client. We could fetch the snapshot and
62    // try to decode it again.
63    failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67    Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70    + Send
71    + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74/// A decoder to process raw messages.
75///
76/// This struct decodes incoming messages of type `FeedMessage` and converts it into the
77/// `BlockUpdate` struct.
78///
79/// # Important:
80/// - Supports registering exchanges and their associated filters for specific protocol components.
81/// - Allows the addition of client-side filters for custom conditions.
82///
83/// **Note:** The tokens provided during configuration will be used for decoding, ensuring
84/// efficient handling of protocol components. Protocol components containing tokens which are not
85/// included in this initial list, or added when applying deltas, will not be decoded.
86pub struct TychoStreamDecoder<H>
87where
88    H: HeaderLike,
89{
90    state: Arc<RwLock<DecoderState>>,
91    skip_state_decode_failures: bool,
92    min_token_quality: u32,
93    registry: HashMap<String, Box<RegistryFn<H>>>,
94    inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108    H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110    pub fn new() -> Self {
111        Self {
112            state: Arc::new(RwLock::new(DecoderState::default())),
113            skip_state_decode_failures: false,
114            min_token_quality: 51,
115            registry: HashMap::new(),
116            inclusion_filters: HashMap::new(),
117        }
118    }
119
120    /// Sets the currently known tokens which will be considered during decoding.
121    ///
122    /// Protocol components containing tokens which are not included in this initial list, or
123    /// added when applying deltas, will not be decoded.
124    pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125        let mut guard = self.state.write().await;
126        guard.tokens = tokens;
127    }
128
129    pub fn skip_state_decode_failures(&mut self, skip: bool) {
130        self.skip_state_decode_failures = skip;
131    }
132
133    /// Registers a decoder for a given exchange with a decoder context.
134    ///
135    /// This method maps an exchange identifier to a specific protocol simulation type.
136    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
137    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
138    /// the component data into the appropriate protocol simulation type based on the current
139    /// blockchain state and the provided block header.
140    /// For example, to register a decoder for the `uniswap_v2` exchange with an additional decoder
141    /// context, you must call this function with
142    /// `register_decoder_with_context::<UniswapV2State>("uniswap_v2", context)`.
143    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
144    /// `UniswapV2State` decoder for use in the protocol stream.
145    pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
146    where
147        T: ProtocolSim
148            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
149            + Send
150            + 'static,
151    {
152        let decoder = Box::new(
153            move |component: ComponentWithState,
154                  header: H,
155                  account_balances: AccountBalances,
156                  state: Arc<RwLock<DecoderState>>| {
157                let context = context.clone();
158                Box::pin(async move {
159                    let guard = state.read().await;
160                    T::try_from_with_header(
161                        component,
162                        header,
163                        &account_balances,
164                        &guard.tokens,
165                        &context,
166                    )
167                    .await
168                    .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
169                }) as DecodeFut
170            },
171        );
172        self.registry
173            .insert(exchange.to_string(), decoder);
174    }
175
176    /// Registers a decoder for a given exchange.
177    ///
178    /// This method maps an exchange identifier to a specific protocol simulation type.
179    /// The associated type must implement the `TryFromWithBlock` trait to enable decoding
180    /// of state updates from `ComponentWithState` objects. This allows the decoder to transform
181    /// the component data into the appropriate protocol simulation type based on the current
182    /// blockchain state and the provided block header.
183    /// For example, to register a decoder for the `uniswap_v2` exchange, you must call
184    /// this function with `register_decoder::<UniswapV2State>("uniswap_v2", vm_attributes)`.
185    /// This ensures that the exchange ID `uniswap_v2` is properly associated with the
186    /// `UniswapV2State` decoder for use in the protocol stream.
187    pub fn register_decoder<T>(&mut self, exchange: &str)
188    where
189        T: ProtocolSim
190            + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
191            + Send
192            + 'static,
193    {
194        let context = DecoderContext::new();
195        self.register_decoder_with_context::<T>(exchange, context);
196    }
197
198    /// Registers a client-side filter function for a given exchange.
199    ///
200    /// Associates a filter function with an exchange ID, enabling custom filtering of protocol
201    /// components. The filter function is applied client-side to refine the data received from the
202    /// stream. It can be used to exclude certain components based on attributes or conditions that
203    /// are not supported by the server-side filtering logic. This is particularly useful for
204    /// implementing custom behaviors, such as:
205    /// - Filtering out pools with specific attributes (e.g., unsupported features).
206    /// - Blacklisting pools based on custom criteria.
207    /// - Excluding pools that do not meet certain requirements (e.g., token pairs or liquidity
208    ///   constraints).
209    ///
210    /// For example, you might use a filter to exclude pools that are not fully supported in the
211    /// protocol, or to ignore pools with certain attributes that are irrelevant to your
212    /// application.
213    pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
214        self.inclusion_filters
215            .insert(exchange.to_string(), predicate);
216    }
217
218    /// Decodes a `FeedMessage` into a `BlockUpdate` containing the updated states of protocol
219    /// components
220    pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
221        // stores all states updated in this tick/msg
222        let mut updated_states = HashMap::new();
223        let mut new_pairs = HashMap::new();
224        let mut removed_pairs = HashMap::new();
225        let mut contracts_map = HashMap::new();
226        let mut msg_failed_components = HashSet::new();
227
228        let header = msg
229            .state_msgs
230            .values()
231            .next()
232            .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
233            .header
234            .clone();
235
236        let block_number_or_timestamp = header
237            .clone()
238            .block_number_or_timestamp();
239        let current_block = header.clone().block();
240
241        for (protocol, protocol_msg) in msg.state_msgs.iter() {
242            // Add any new tokens
243            if let Some(deltas) = protocol_msg.deltas.as_ref() {
244                let mut state_guard = self.state.write().await;
245
246                let new_tokens = deltas
247                    .new_tokens
248                    .iter()
249                    .filter(|(addr, t)| {
250                        t.quality >= self.min_token_quality &&
251                            !state_guard.tokens.contains_key(*addr)
252                    })
253                    .filter_map(|(addr, t)| {
254                        t.clone()
255                            .try_into()
256                            .map(|token| (addr.clone(), token))
257                            .inspect_err(|e| {
258                                warn!("Failed decoding token {e:?} {addr:#044x}");
259                                *e
260                            })
261                            .ok()
262                    })
263                    .collect::<HashMap<Bytes, Token>>();
264
265                if !new_tokens.is_empty() {
266                    debug!(n = new_tokens.len(), "NewTokens");
267                    state_guard.tokens.extend(new_tokens);
268                }
269            }
270
271            // Remove untracked components
272            {
273                let mut state_guard = self.state.write().await;
274                let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
275                    .removed_components
276                    .iter()
277                    .map(|(id, comp)| {
278                        if *id != comp.id {
279                            error!(
280                                "Component id mismatch in removed components {id} != {}",
281                                comp.id
282                            );
283                            return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
284                        }
285
286                        let tokens = comp
287                            .tokens
288                            .iter()
289                            .flat_map(|addr| state_guard.tokens.get(addr).cloned())
290                            .collect::<Vec<_>>();
291
292                        if tokens.len() == comp.tokens.len() {
293                            Ok(Some((
294                                id.clone(),
295                                ProtocolComponent::from_with_tokens(comp.clone(), tokens),
296                            )))
297                        } else {
298                            Ok(None)
299                        }
300                    })
301                    .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
302                    )?
303                    .into_iter()
304                    .flatten()
305                    .collect();
306
307                // Remove components from state and add to removed_pairs
308                for (id, component) in removed_components {
309                    state_guard.components.remove(&id);
310                    state_guard.states.remove(&id);
311                    removed_pairs.insert(id, component);
312                }
313
314                // UPDATE VM STORAGE
315                info!(
316                    "Processing {} contracts from snapshots",
317                    protocol_msg
318                        .snapshots
319                        .get_vm_storage()
320                        .len()
321                );
322
323                let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
324                let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
325                for (key, value) in protocol_msg
326                    .snapshots
327                    .get_vm_storage()
328                    .iter()
329                {
330                    let account: ResponseAccount = value.clone().into();
331
332                    if state_guard.tokens.contains_key(key) {
333                        let original_address = account.address;
334                        // To work with Tycho's token overwrites system, if we get account
335                        // snapshots for a token we must handle them with a proxy/wrapper
336                        // contract.
337                        // Note: storage for the original contract must be set at the proxy
338                        // contract address. This is because the proxy contract uses
339                        // delegatecall to the original (implementation) contract.
340
341                        // Handle proxy token accounts
342                        let (impl_addr, proxy_state) = match state_guard
343                            .proxy_token_addresses
344                            .get(&original_address)
345                        {
346                            Some(impl_addr) => {
347                                // Token already has a proxy contract, simply update it.
348
349                                // Note: we apply the snapshot as an update. This is to cover the
350                                // case where a contract may be stale as it stopped being tracked
351                                // for some reason (e.g. due to a drop in tvl) and is now being
352                                // tracked again.
353                                let proxy_state = AccountUpdate::new(
354                                    original_address,
355                                    value.chain.into(),
356                                    account.slots.clone(),
357                                    Some(account.native_balance),
358                                    None,
359                                    ChangeType::Update,
360                                );
361                                (*impl_addr, proxy_state)
362                            }
363                            None => {
364                                // Token does not have a proxy contract yet, create one
365
366                                // Assign original token contract to new address
367                                let impl_addr = generate_proxy_token_address(
368                                    state_guard.proxy_token_addresses.len() as u32,
369                                )?;
370                                state_guard
371                                    .proxy_token_addresses
372                                    .insert(original_address, impl_addr);
373
374                                // Add proxy token contract at original token address
375                                let proxy_state = create_proxy_token_account(
376                                    original_address,
377                                    Some(impl_addr),
378                                    &account.slots,
379                                    value.chain.into(),
380                                    Some(account.native_balance),
381                                );
382
383                                (impl_addr, proxy_state)
384                            }
385                        };
386
387                        proxy_token_accounts.insert(original_address, proxy_state);
388
389                        // Assign original token contract to the implementation address
390                        let impl_update = ResponseAccount {
391                            address: impl_addr,
392                            slots: HashMap::new(),
393                            ..account.clone()
394                        };
395                        storage_by_address.insert(impl_addr, impl_update);
396                    } else {
397                        // Not a token, apply snapshot to the account at its original address
398                        storage_by_address.insert(account.address, account);
399                    }
400                }
401
402                // Split proxy accounts by change type:
403                // - Creation: new proxies that must overwrite any existing placeholder
404                // - Update: existing proxies whose storage is being refreshed (handled normally)
405                let mut proxy_creates: Vec<AccountUpdate> = Vec::new();
406                let mut proxy_updates: HashMap<Address, AccountUpdate> = HashMap::new();
407                for (addr, update) in proxy_token_accounts {
408                    if matches!(update.change, ChangeType::Creation) {
409                        proxy_creates.push(update);
410                    } else {
411                        proxy_updates.insert(addr, update);
412                    }
413                }
414
415                info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
416                update_engine(
417                    SHARED_TYCHO_DB.clone(),
418                    header.clone().block(),
419                    Some(storage_by_address),
420                    proxy_updates,
421                )
422                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
423
424                // Force-overwrite new proxy token accounts so that authoritative vm_storage data
425                // always wins over any empty placeholder previously inserted by another
426                // decoder's snapshot loop (which uses init_account / init-if-not-exists).
427                if !proxy_creates.is_empty() {
428                    SHARED_TYCHO_DB
429                        .force_update_accounts(proxy_creates)
430                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
431                }
432                info!("Engine updated");
433                drop(state_guard);
434            }
435
436            // Construct a contract to token balances map: HashMap<ContractAddress,
437            // HashMap<TokenAddress, Balance>>
438            let account_balances = protocol_msg
439                .clone()
440                .snapshots
441                .get_vm_storage()
442                .iter()
443                .filter_map(|(addr, acc)| {
444                    let balances = acc.token_balances.clone();
445                    if balances.is_empty() {
446                        return None;
447                    }
448                    Some((addr.clone(), balances))
449                })
450                .collect::<AccountBalances>();
451
452            let mut new_components = HashMap::new();
453            let mut count_token_skips = 0;
454            let mut components_to_store = HashMap::new();
455            {
456                let state_guard = self.state.read().await;
457
458                // PROCESS SNAPSHOTS
459                'snapshot_loop: for (id, snapshot) in protocol_msg
460                    .snapshots
461                    .get_states()
462                    .clone()
463                {
464                    // Skip any unsupported pools
465                    if self
466                        .inclusion_filters
467                        .get(protocol.as_str())
468                        .is_some_and(|predicate| !predicate(&snapshot))
469                    {
470                        continue;
471                    }
472
473                    // Construct component from snapshot
474                    let mut component_tokens = Vec::new();
475                    let mut new_tokens_accounts = HashMap::new();
476                    for token in snapshot.component.tokens.clone() {
477                        match state_guard.tokens.get(&token) {
478                            Some(token) => {
479                                component_tokens.push(token.clone());
480
481                                // If the token is not an existing proxy token, we need to add it to
482                                // the simulation engine
483                                let token_address = match bytes_to_address(&token.address) {
484                                    Ok(addr) => addr,
485                                    Err(_) => {
486                                        count_token_skips += 1;
487                                        msg_failed_components.insert(id.clone());
488                                        warn!(
489                                            "Token address could not be decoded {}, ignoring pool {:x?}",
490                                            token.address, id
491                                        );
492                                        continue 'snapshot_loop;
493                                    }
494                                };
495                                // Deploy a proxy account without an implementation set
496                                if !state_guard
497                                    .proxy_token_addresses
498                                    .contains_key(&token_address)
499                                {
500                                    new_tokens_accounts.insert(
501                                        token_address,
502                                        create_proxy_token_account(
503                                            token_address,
504                                            None,
505                                            &HashMap::new(),
506                                            snapshot.component.chain.into(),
507                                            None,
508                                        ),
509                                    );
510                                }
511                            }
512                            None => {
513                                count_token_skips += 1;
514                                msg_failed_components.insert(id.clone());
515                                debug!("Token not found {}, ignoring pool {:x?}", token, id);
516                                continue 'snapshot_loop;
517                            }
518                        }
519                    }
520                    let component = ProtocolComponent::from_with_tokens(
521                        snapshot.component.clone(),
522                        component_tokens,
523                    );
524
525                    // Add new tokens to the simulation engine
526                    if !new_tokens_accounts.is_empty() {
527                        update_engine(
528                            SHARED_TYCHO_DB.clone(),
529                            header.clone().block(),
530                            None,
531                            new_tokens_accounts,
532                        )
533                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
534                    }
535
536                    // collect contracts:ids mapping for states that should update on contract
537                    // changes (non-manual updates)
538                    if !component
539                        .static_attributes
540                        .contains_key("manual_updates")
541                    {
542                        for contract in &component.contract_ids {
543                            contracts_map
544                                .entry(contract.clone())
545                                .or_insert_with(HashSet::new)
546                                .insert(id.clone());
547                        }
548                        // Add DCI contracts so changes to these contracts trigger
549                        // an update
550                        for (_, tracing) in snapshot.entrypoints.iter() {
551                            for contract in tracing.accessed_slots.keys().cloned() {
552                                contracts_map
553                                    .entry(contract)
554                                    .or_insert_with(HashSet::new)
555                                    .insert(id.clone());
556                            }
557                        }
558                    }
559
560                    // Collect new pairs (components)
561                    new_pairs.insert(id.clone(), component.clone());
562
563                    // Store component for later batch insertion
564                    components_to_store.insert(id.clone(), component);
565
566                    // Construct state from snapshot
567                    if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
568                        match state_decode_f(
569                            snapshot,
570                            header.clone(),
571                            account_balances.clone(),
572                            self.state.clone(),
573                        )
574                        .await
575                        {
576                            Ok(state) => {
577                                new_components.insert(id.clone(), state);
578                            }
579                            Err(e) => {
580                                if self.skip_state_decode_failures {
581                                    warn!(pool = id, error = %e, "StateDecodingFailure");
582                                    msg_failed_components.insert(id.clone());
583                                    continue 'snapshot_loop;
584                                } else {
585                                    error!(pool = id, error = %e, "StateDecodingFailure");
586                                    return Err(StreamDecodeError::Fatal(format!("{e}")));
587                                }
588                            }
589                        }
590                    } else if self.skip_state_decode_failures {
591                        warn!(pool = id, "MissingDecoderRegistration");
592                        msg_failed_components.insert(id.clone());
593                        continue 'snapshot_loop;
594                    } else {
595                        error!(pool = id, "MissingDecoderRegistration");
596                        return Err(StreamDecodeError::Fatal(format!(
597                            "Missing decoder registration for: {id}"
598                        )));
599                    }
600                }
601            }
602
603            // Batch insert components into state
604            if !components_to_store.is_empty() {
605                let mut state_guard = self.state.write().await;
606                for (id, component) in components_to_store {
607                    state_guard
608                        .components
609                        .insert(id, component);
610                }
611            }
612
613            if !protocol_msg.snapshots.states.is_empty() {
614                info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
615            }
616            if count_token_skips > 0 {
617                info!("Skipped {count_token_skips} pools due to missing tokens");
618            }
619
620            //TODO: should we remove failed components for new_components?
621            updated_states.extend(new_components);
622
623            // PROCESS DELTAS
624            if let Some(deltas) = protocol_msg.deltas.clone() {
625                // Update engine with account changes
626                let mut state_guard = self.state.write().await;
627
628                let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
629                // New proxy token accounts that must overwrite any existing placeholder.
630                let mut new_proxy_accounts: Vec<AccountUpdate> = Vec::new();
631                for (key, value) in deltas.account_updates.iter() {
632                    let mut update: AccountUpdate = value.clone().into();
633
634                    // TEMP PATCH (ENG-4993)
635                    //
636                    // The indexer emits deltas without code marked as creations, which crashes
637                    // TychoDB. Until fixed, treat them as updates (since EVM code cannot be
638                    // deleted).
639                    if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
640                        error!(
641                            update = ?update,
642                            "FaultyCreationDelta"
643                        );
644                        update.change = ChangeType::Update;
645                    }
646
647                    if state_guard.tokens.contains_key(key) {
648                        let original_address = update.address;
649                        // If the account is a token, we need to handle it with a proxy contract.
650                        // Storage updates apply to the proxy contract (at original address).
651                        // Code updates (if any) apply to the token implementation contract (at
652                        // impl_addr).
653
654                        // Handle proxy contract updates
655                        let impl_addr = match state_guard
656                            .proxy_token_addresses
657                            .get(&original_address)
658                        {
659                            Some(impl_addr) => {
660                                // Token already has a proxy contract.
661
662                                // Apply the storage update to proxy contract
663                                let proxy_update = AccountUpdate { code: None, ..update.clone() };
664                                account_update_by_address.insert(original_address, proxy_update);
665
666                                *impl_addr
667                            }
668                            None => {
669                                // Token does not have a proxy contract yet, create one
670
671                                // Assign original token (implementation) contract to new proxy
672                                // address
673                                let impl_addr = generate_proxy_token_address(
674                                    state_guard.proxy_token_addresses.len() as u32,
675                                )?;
676                                state_guard
677                                    .proxy_token_addresses
678                                    .insert(original_address, impl_addr);
679
680                                // Create proxy token account with original account's storage (at
681                                // original address). Track it separately so it can be
682                                // force-overwritten and win over any placeholder that another
683                                // decoder's snapshot loop may have written earlier.
684                                let proxy_state = create_proxy_token_account(
685                                    original_address,
686                                    Some(impl_addr),
687                                    &update.slots,
688                                    update.chain,
689                                    update.balance,
690                                );
691                                new_proxy_accounts.push(proxy_state);
692
693                                impl_addr
694                            }
695                        };
696
697                        // Apply code update to token implementation contract
698                        if update.code.is_some() {
699                            let impl_update = AccountUpdate {
700                                address: impl_addr,
701                                slots: HashMap::new(),
702                                ..update.clone()
703                            };
704                            account_update_by_address.insert(impl_addr, impl_update);
705                        }
706                    } else {
707                        // Not a token, apply update to the account at its original address
708                        account_update_by_address.insert(update.address, update);
709                    }
710                }
711                drop(state_guard);
712
713                let state_guard = self.state.read().await;
714                info!("Updating engine with {} contract deltas", deltas.account_updates.len());
715                update_engine(
716                    SHARED_TYCHO_DB.clone(),
717                    header.clone().block(),
718                    None,
719                    account_update_by_address,
720                )
721                .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
722
723                // Force-overwrite any newly-created proxy token accounts so they always win
724                // over placeholder entries inserted by other decoders' snapshot loops.
725                if !new_proxy_accounts.is_empty() {
726                    SHARED_TYCHO_DB
727                        .force_update_accounts(new_proxy_accounts)
728                        .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
729                }
730                info!("Engine updated");
731
732                // Collect all pools related to the updated accounts
733                let mut pools_to_update = HashSet::new();
734                for (account, _update) in deltas.account_updates {
735                    // get new pools related to the account updated
736                    pools_to_update.extend(
737                        contracts_map
738                            .get(&account)
739                            .cloned()
740                            .unwrap_or_default(),
741                    );
742                    // get existing pools related to the account updated
743                    pools_to_update.extend(
744                        state_guard
745                            .contracts_map
746                            .get(&account)
747                            .cloned()
748                            .unwrap_or_default(),
749                    );
750                }
751
752                // Collect all balance changes this block
753                let all_balances = Balances {
754                    component_balances: deltas
755                        .component_balances
756                        .iter()
757                        .map(|(pool_id, bals)| {
758                            let mut balances = HashMap::new();
759                            for (t, b) in &bals.0 {
760                                balances.insert(t.clone(), b.balance.clone());
761                            }
762                            pools_to_update.insert(pool_id.clone());
763                            (pool_id.clone(), balances)
764                        })
765                        .collect(),
766                    account_balances: deltas
767                        .account_balances
768                        .iter()
769                        .map(|(account, bals)| {
770                            let mut balances = HashMap::new();
771                            for (t, b) in bals {
772                                balances.insert(t.clone(), b.balance.clone());
773                            }
774                            pools_to_update.extend(
775                                contracts_map
776                                    .get(account)
777                                    .cloned()
778                                    .unwrap_or_default(),
779                            );
780                            (account.clone(), balances)
781                        })
782                        .collect(),
783                };
784
785                // update states with protocol state deltas (attribute changes etc.)
786                for (id, update) in deltas.state_updates {
787                    // TODO: is this needed?
788                    let update_with_block =
789                        Self::add_block_info_to_delta(update, current_block.clone());
790                    match Self::apply_update(
791                        &id,
792                        update_with_block,
793                        &mut updated_states,
794                        &state_guard,
795                        &all_balances,
796                    ) {
797                        Ok(_) => {
798                            pools_to_update.remove(&id);
799                        }
800                        Err(e) => {
801                            if self.skip_state_decode_failures {
802                                warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
803                                // Remove from updated_states if it was there
804                                updated_states.remove(&id);
805                                // Try to get component from new_pairs first, then from state
806                                if let Some(component) = new_pairs.remove(&id) {
807                                    removed_pairs.insert(id.clone(), component);
808                                } else if let Some(component) = state_guard.components.get(&id) {
809                                    removed_pairs.insert(id.clone(), component.clone());
810                                } else {
811                                    // Component not found in new_pairs or state, this shouldn't
812                                    // happen
813                                    warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
814                                }
815                                pools_to_update.remove(&id);
816
817                                // Add to failed components
818                                msg_failed_components.insert(id.clone());
819                            } else {
820                                return Err(e);
821                            }
822                        }
823                    }
824                }
825
826                // update remaining pools linked to updated contracts/updated balances
827                for pool in pools_to_update {
828                    // TODO: is this needed?
829                    let default_delta_with_block = Self::add_block_info_to_delta(
830                        ProtocolStateDelta::default(),
831                        current_block.clone(),
832                    );
833                    match Self::apply_update(
834                        &pool,
835                        default_delta_with_block,
836                        &mut updated_states,
837                        &state_guard,
838                        &all_balances,
839                    ) {
840                        Ok(_) => {}
841                        Err(e) => {
842                            if self.skip_state_decode_failures {
843                                warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
844                                // Remove from updated_states if it was there
845                                updated_states.remove(&pool);
846                                // Try to get component from new_pairs first, then from state
847                                if let Some(component) = new_pairs.remove(&pool) {
848                                    removed_pairs.insert(pool.clone(), component);
849                                } else if let Some(component) = state_guard.components.get(&pool) {
850                                    removed_pairs.insert(pool.clone(), component.clone());
851                                } else {
852                                    // Component not found in new_pairs or state, this shouldn't
853                                    // happen
854                                    warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
855                                }
856
857                                // Add to failed components
858                                msg_failed_components.insert(pool.clone());
859                            } else {
860                                return Err(e);
861                            }
862                        }
863                    }
864                }
865            };
866        }
867
868        // Persist the newly added/updated states
869        let mut state_guard = self.state.write().await;
870
871        // Update failed components with any new ones
872        state_guard
873            .failed_components
874            .extend(msg_failed_components);
875
876        // Remove any failed components from Updates
877        // Perf: we could do it directly in the decoder logic to avoid some steps, but this logic is
878        // complex and this is more robust.
879        updated_states.retain(|id, _| {
880            !state_guard
881                .failed_components
882                .contains(id)
883        });
884        new_pairs.retain(|id, _| {
885            !state_guard
886                .failed_components
887                .contains(id)
888        });
889
890        state_guard
891            .states
892            .extend(updated_states.clone().into_iter());
893
894        // Add new components to persistent state
895        for (id, component) in new_pairs.iter() {
896            state_guard
897                .components
898                .insert(id.clone(), component.clone());
899        }
900
901        // Remove components from persistent state
902        for (id, _) in removed_pairs.iter() {
903            state_guard.components.remove(id);
904        }
905
906        for (key, values) in contracts_map {
907            state_guard
908                .contracts_map
909                .entry(key)
910                .or_insert_with(HashSet::new)
911                .extend(values);
912        }
913
914        // Send the tick with all updated states
915        Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
916            .set_removed_pairs(removed_pairs)
917            .set_sync_states(msg.sync_states.clone()))
918    }
919
920    /// Add block information (number and timestamp) to a ProtocolStateDelta
921    fn add_block_info_to_delta(
922        mut delta: ProtocolStateDelta,
923        block_header_opt: Option<BlockHeader>,
924    ) -> ProtocolStateDelta {
925        if let Some(header) = block_header_opt {
926            // Add block_number and block_timestamp attributes to ensure pool states
927            // receive current block information during delta_transition
928            delta.updated_attributes.insert(
929                "block_number".to_string(),
930                Bytes::from(header.number.to_be_bytes().to_vec()),
931            );
932            delta.updated_attributes.insert(
933                "block_timestamp".to_string(),
934                Bytes::from(header.timestamp.to_be_bytes().to_vec()),
935            );
936        }
937        delta
938    }
939
940    fn apply_update(
941        id: &String,
942        update: ProtocolStateDelta,
943        updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
944        state_guard: &RwLockReadGuard<'_, DecoderState>,
945        all_balances: &Balances,
946    ) -> Result<(), StreamDecodeError> {
947        match updated_states.entry(id.clone()) {
948            Entry::Occupied(mut entry) => {
949                // If state exists in updated_states, apply the delta to it
950                let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
951                state
952                    .delta_transition(update, &state_guard.tokens, all_balances)
953                    .map_err(|e| {
954                        error!(pool = id, error = ?e, "DeltaTransitionError");
955                        StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
956                    })?;
957            }
958            Entry::Vacant(_) => {
959                match state_guard.states.get(id) {
960                    // If state does not exist in updated_states, apply the delta to the stored
961                    // state
962                    Some(stored_state) => {
963                        let mut state = stored_state.clone();
964                        state
965                            .delta_transition(update, &state_guard.tokens, all_balances)
966                            .map_err(|e| {
967                                error!(pool = id, error = ?e, "DeltaTransitionError");
968                                StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
969                            })?;
970                        updated_states.insert(id.clone(), state);
971                    }
972                    None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
973                }
974            }
975        }
976        Ok(())
977    }
978}
979
980/// Generate a proxy token address for a given token index
981fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
982    let padded_idx = format!("{idx:x}");
983    let padded_zeroes = "0".repeat(33 - padded_idx.len());
984    let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
985    let decoded = hex::decode(proxy_token_address).map_err(|e| {
986        StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
987    })?;
988
989    const ADDRESS_LENGTH: usize = 20;
990    if decoded.len() != ADDRESS_LENGTH {
991        return Err(StreamDecodeError::Fatal(format!(
992            "Invalid proxy token address length: expected {}, got {}",
993            ADDRESS_LENGTH,
994            decoded.len(),
995        )));
996    }
997
998    Ok(Address::from_slice(&decoded))
999}
1000
1001/// Create a proxy token account for a token at a given address
1002///
1003/// The proxy token account is created at the original token address and points to the new token
1004/// address.
1005fn create_proxy_token_account(
1006    addr: Address,
1007    new_address: Option<Address>,
1008    storage: &HashMap<U256, U256>,
1009    chain: Chain,
1010    balance: Option<U256>,
1011) -> AccountUpdate {
1012    let mut slots = storage.clone();
1013    if let Some(new_address) = new_address {
1014        slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
1015    }
1016
1017    AccountUpdate {
1018        address: addr,
1019        chain,
1020        slots,
1021        balance,
1022        code: Some(ERC20_PROXY_BYTECODE.to_vec()),
1023        change: ChangeType::Creation,
1024    }
1025}
1026
1027#[cfg(test)]
1028mock! {
1029    #[derive(Debug)]
1030    pub ProtocolSim {
1031        pub fn fee(&self) -> f64;
1032        pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
1033        pub fn get_amount_out(
1034            &self,
1035            amount_in: BigUint,
1036            token_in: &Token,
1037            token_out: &Token,
1038        ) -> Result<GetAmountOutResult, SimulationError>;
1039        pub fn get_limits(
1040            &self,
1041            sell_token: Bytes,
1042            buy_token: Bytes,
1043        ) -> Result<(BigUint, BigUint), SimulationError>;
1044        pub fn delta_transition(
1045            &mut self,
1046            delta: ProtocolStateDelta,
1047            tokens: &HashMap<Bytes, Token>,
1048            balances: &Balances,
1049        ) -> Result<(), TransitionError>;
1050        pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1051        pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1052    }
1053}
1054
1055#[cfg(test)]
1056crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1057
1058#[cfg(test)]
1059impl ProtocolSim for MockProtocolSim {
1060    fn fee(&self) -> f64 {
1061        self.fee()
1062    }
1063
1064    fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1065        self.spot_price(base, quote)
1066    }
1067
1068    fn get_amount_out(
1069        &self,
1070        amount_in: BigUint,
1071        token_in: &Token,
1072        token_out: &Token,
1073    ) -> Result<GetAmountOutResult, SimulationError> {
1074        self.get_amount_out(amount_in, token_in, token_out)
1075    }
1076
1077    fn get_limits(
1078        &self,
1079        sell_token: Bytes,
1080        buy_token: Bytes,
1081    ) -> Result<(BigUint, BigUint), SimulationError> {
1082        self.get_limits(sell_token, buy_token)
1083    }
1084
1085    fn delta_transition(
1086        &mut self,
1087        delta: ProtocolStateDelta,
1088        tokens: &HashMap<Bytes, Token>,
1089        balances: &Balances,
1090    ) -> Result<(), TransitionError> {
1091        self.delta_transition(delta, tokens, balances)
1092    }
1093
1094    fn clone_box(&self) -> Box<dyn ProtocolSim> {
1095        self.clone_box()
1096    }
1097
1098    fn as_any(&self) -> &dyn Any {
1099        panic!("MockProtocolSim does not support as_any")
1100    }
1101
1102    fn as_any_mut(&mut self) -> &mut dyn Any {
1103        panic!("MockProtocolSim does not support as_any_mut")
1104    }
1105
1106    fn eq(&self, other: &dyn ProtocolSim) -> bool {
1107        self.eq(other)
1108    }
1109
1110    fn typetag_name(&self) -> &'static str {
1111        unreachable!()
1112    }
1113
1114    fn typetag_deserialize(&self) {
1115        unreachable!()
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use std::{fs, path::Path, str::FromStr};
1122
1123    use alloy::primitives::address;
1124    use mockall::predicate::*;
1125    use rstest::*;
1126    use tycho_client::feed::BlockHeader;
1127    use tycho_common::{models::Chain, Bytes};
1128
1129    use super::*;
1130    use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1131
1132    async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1133        let mut decoder = TychoStreamDecoder::new();
1134        decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1135        if set_tokens {
1136            let tokens = [
1137                Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1138                Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1139            ]
1140            .iter()
1141            .map(|addr| {
1142                let addr_str = format!("{addr:x}");
1143                (
1144                    addr.clone(),
1145                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1146                )
1147            })
1148            .collect();
1149            decoder.set_tokens(tokens).await;
1150        }
1151        decoder
1152    }
1153
1154    fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1155        let project_root = env!("CARGO_MANIFEST_DIR");
1156        let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1157        let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1158        serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1159    }
1160
1161    #[tokio::test]
1162    async fn test_decode() {
1163        let decoder = setup_decoder(true).await;
1164
1165        let msg = load_test_msg("uniswap_v2_snapshot");
1166        let res1 = decoder
1167            .decode(&msg)
1168            .await
1169            .expect("decode failure");
1170        let msg = load_test_msg("uniswap_v2_delta");
1171        let res2 = decoder
1172            .decode(&msg)
1173            .await
1174            .expect("decode failure");
1175
1176        assert_eq!(res1.states.len(), 1);
1177        assert_eq!(res2.states.len(), 1);
1178        assert_eq!(res1.sync_states.len(), 1);
1179        assert_eq!(res2.sync_states.len(), 1);
1180    }
1181
1182    #[tokio::test]
1183    async fn test_decode_component_missing_token() {
1184        let decoder = setup_decoder(false).await;
1185        let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1186            .iter()
1187            .map(|addr| {
1188                let addr_str = format!("{addr:x}");
1189                (
1190                    addr.clone(),
1191                    Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1192                )
1193            })
1194            .collect();
1195        decoder.set_tokens(tokens).await;
1196
1197        let msg = load_test_msg("uniswap_v2_snapshot");
1198        let res1 = decoder
1199            .decode(&msg)
1200            .await
1201            .expect("decode failure");
1202
1203        assert_eq!(res1.states.len(), 0);
1204    }
1205
1206    #[tokio::test]
1207    async fn test_decode_component_bad_id() {
1208        let decoder = setup_decoder(true).await;
1209        let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1210
1211        match decoder.decode(&msg).await {
1212            Err(StreamDecodeError::Fatal(msg)) => {
1213                assert_eq!(msg, "Component id mismatch");
1214            }
1215            Ok(_) => {
1216                panic!("Expected failures to be raised")
1217            }
1218        }
1219    }
1220
1221    #[rstest]
1222    #[case(true)]
1223    #[case(false)]
1224    #[tokio::test]
1225    async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1226        let mut decoder = setup_decoder(true).await;
1227        decoder.skip_state_decode_failures = skip_failures;
1228
1229        let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1230        match decoder.decode(&msg).await {
1231            Err(StreamDecodeError::Fatal(msg)) => {
1232                if !skip_failures {
1233                    assert_eq!(msg, "Missing attributes reserve0");
1234                } else {
1235                    panic!("Expected failures to be ignored. Err: {msg}")
1236                }
1237            }
1238            Ok(res) => {
1239                if !skip_failures {
1240                    panic!("Expected failures to be raised")
1241                } else {
1242                    assert_eq!(res.states.len(), 0);
1243                }
1244            }
1245        }
1246    }
1247
1248    #[tokio::test]
1249    async fn test_decode_updates_state_on_contract_change() {
1250        let decoder = setup_decoder(true).await;
1251
1252        // Create the mock instances
1253        let mut mock_state = MockProtocolSim::new();
1254
1255        mock_state
1256            .expect_clone_box()
1257            .times(1)
1258            .returning(|| {
1259                let mut cloned_mock_state = MockProtocolSim::new();
1260                // Expect `delta_transition` to be called once with any parameters
1261                cloned_mock_state
1262                    .expect_delta_transition()
1263                    .times(1)
1264                    .returning(|_, _, _| Ok(()));
1265                cloned_mock_state
1266                    .expect_clone_box()
1267                    .times(1)
1268                    .returning(|| Box::new(MockProtocolSim::new()));
1269                Box::new(cloned_mock_state)
1270            });
1271
1272        // Insert mock state into `updated_states`
1273        let pool_id =
1274            "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1275        decoder
1276            .state
1277            .write()
1278            .await
1279            .states
1280            .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1281        decoder
1282            .state
1283            .write()
1284            .await
1285            .contracts_map
1286            .insert(
1287                Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1288                HashSet::from([pool_id.clone()]),
1289            );
1290
1291        // Load a test message containing a contract update
1292        let msg = load_test_msg("balancer_v2_delta");
1293
1294        // Decode the message
1295        let _ = decoder
1296            .decode(&msg)
1297            .await
1298            .expect("decode failure");
1299
1300        // The mock framework will assert that `delta_transition` was called exactly once
1301    }
1302
1303    #[test]
1304    fn test_generate_proxy_token_address() {
1305        let idx = 1;
1306        let generated_address =
1307            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1308        assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1309
1310        let idx = 123456;
1311        let generated_address =
1312            generate_proxy_token_address(idx).expect("proxy token address should be valid");
1313        assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1314    }
1315
1316    #[tokio::test(flavor = "multi_thread")]
1317    async fn test_euler_hook_low_pool_manager_balance() {
1318        let mut decoder = TychoStreamDecoder::new();
1319
1320        decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1321            "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1322        );
1323
1324        let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1325        let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1326        let tokens = HashMap::from([
1327            (
1328                weth.clone(),
1329                Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1330            ),
1331            (
1332                teth.clone(),
1333                Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1334            ),
1335        ]);
1336
1337        decoder.set_tokens(tokens.clone()).await;
1338
1339        let msg = load_test_msg("euler_hook_snapshot");
1340        let res = decoder
1341            .decode(&msg)
1342            .await
1343            .expect("decode failure");
1344
1345        let pool_state = res
1346            .states
1347            .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1348            .expect("Couldn't find target pool");
1349        let amount_out = pool_state
1350            .get_amount_out(
1351                BigUint::from_str("1000000000000000000").unwrap(),
1352                tokens.get(&teth).unwrap(),
1353                tokens.get(&weth).unwrap(),
1354            )
1355            .expect("Get amount out failed");
1356
1357        assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1358    }
1359}