1use std::{
2 collections::{hash_map::Entry, HashMap, HashSet},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14 dto::{ChangeType, ProtocolStateDelta},
15 models::{token::Token, Chain},
16 simulation::protocol_sim::{Balances, ProtocolSim},
17 Bytes,
18};
19#[cfg(test)]
20use {
21 mockall::mock,
22 num_bigint::BigUint,
23 std::any::Any,
24 tycho_common::simulation::{
25 errors::{SimulationError, TransitionError},
26 protocol_sim::GetAmountOutResult,
27 },
28};
29
30use crate::{
31 evm::{
32 engine_db::{update_engine, SHARED_TYCHO_DB},
33 protocol::{
34 utils::bytes_to_address,
35 vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36 },
37 tycho_models::{AccountUpdate, ResponseAccount},
38 },
39 protocol::{
40 errors::InvalidSnapshotError,
41 models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42 },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47 #[error("{0}")]
48 Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53 tokens: HashMap<Bytes, Token>,
54 states: HashMap<String, Box<dyn ProtocolSim>>,
55 components: HashMap<String, ProtocolComponent>,
56 contracts_map: HashMap<Bytes, HashSet<String>>,
58 proxy_token_addresses: HashMap<Address, Address>,
60 failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67 Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70 + Send
71 + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74pub struct TychoStreamDecoder<H>
87where
88 H: HeaderLike,
89{
90 state: Arc<RwLock<DecoderState>>,
91 skip_state_decode_failures: bool,
92 min_token_quality: u32,
93 registry: HashMap<String, Box<RegistryFn<H>>>,
94 inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110 pub fn new() -> Self {
111 Self {
112 state: Arc::new(RwLock::new(DecoderState::default())),
113 skip_state_decode_failures: false,
114 min_token_quality: 51,
115 registry: HashMap::new(),
116 inclusion_filters: HashMap::new(),
117 }
118 }
119
120 pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125 let mut guard = self.state.write().await;
126 guard.tokens = tokens;
127 }
128
129 pub fn skip_state_decode_failures(&mut self, skip: bool) {
130 self.skip_state_decode_failures = skip;
131 }
132
133 pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
146 where
147 T: ProtocolSim
148 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
149 + Send
150 + 'static,
151 {
152 let decoder = Box::new(
153 move |component: ComponentWithState,
154 header: H,
155 account_balances: AccountBalances,
156 state: Arc<RwLock<DecoderState>>| {
157 let context = context.clone();
158 Box::pin(async move {
159 let guard = state.read().await;
160 T::try_from_with_header(
161 component,
162 header,
163 &account_balances,
164 &guard.tokens,
165 &context,
166 )
167 .await
168 .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
169 }) as DecodeFut
170 },
171 );
172 self.registry
173 .insert(exchange.to_string(), decoder);
174 }
175
176 pub fn register_decoder<T>(&mut self, exchange: &str)
188 where
189 T: ProtocolSim
190 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
191 + Send
192 + 'static,
193 {
194 let context = DecoderContext::new();
195 self.register_decoder_with_context::<T>(exchange, context);
196 }
197
198 pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
214 self.inclusion_filters
215 .insert(exchange.to_string(), predicate);
216 }
217
218 pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
221 let mut updated_states = HashMap::new();
223 let mut new_pairs = HashMap::new();
224 let mut removed_pairs = HashMap::new();
225 let mut contracts_map = HashMap::new();
226 let mut msg_failed_components = HashSet::new();
227
228 let header = msg
229 .state_msgs
230 .values()
231 .next()
232 .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
233 .header
234 .clone();
235
236 let block_number_or_timestamp = header
237 .clone()
238 .block_number_or_timestamp();
239 let current_block = header.clone().block();
240
241 for (protocol, protocol_msg) in msg.state_msgs.iter() {
242 if let Some(deltas) = protocol_msg.deltas.as_ref() {
244 let mut state_guard = self.state.write().await;
245
246 let new_tokens = deltas
247 .new_tokens
248 .iter()
249 .filter(|(addr, t)| {
250 t.quality >= self.min_token_quality &&
251 !state_guard.tokens.contains_key(*addr)
252 })
253 .filter_map(|(addr, t)| {
254 t.clone()
255 .try_into()
256 .map(|token| (addr.clone(), token))
257 .inspect_err(|e| {
258 warn!("Failed decoding token {e:?} {addr:#044x}");
259 *e
260 })
261 .ok()
262 })
263 .collect::<HashMap<Bytes, Token>>();
264
265 if !new_tokens.is_empty() {
266 debug!(n = new_tokens.len(), "NewTokens");
267 state_guard.tokens.extend(new_tokens);
268 }
269 }
270
271 {
273 let mut state_guard = self.state.write().await;
274 let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
275 .removed_components
276 .iter()
277 .map(|(id, comp)| {
278 if *id != comp.id {
279 error!(
280 "Component id mismatch in removed components {id} != {}",
281 comp.id
282 );
283 return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
284 }
285
286 let tokens = comp
287 .tokens
288 .iter()
289 .flat_map(|addr| state_guard.tokens.get(addr).cloned())
290 .collect::<Vec<_>>();
291
292 if tokens.len() == comp.tokens.len() {
293 Ok(Some((
294 id.clone(),
295 ProtocolComponent::from_with_tokens(comp.clone(), tokens),
296 )))
297 } else {
298 Ok(None)
299 }
300 })
301 .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
302 )?
303 .into_iter()
304 .flatten()
305 .collect();
306
307 for (id, component) in removed_components {
309 state_guard.components.remove(&id);
310 state_guard.states.remove(&id);
311 removed_pairs.insert(id, component);
312 }
313
314 info!(
316 "Processing {} contracts from snapshots",
317 protocol_msg
318 .snapshots
319 .get_vm_storage()
320 .len()
321 );
322
323 let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
324 let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
325 for (key, value) in protocol_msg
326 .snapshots
327 .get_vm_storage()
328 .iter()
329 {
330 let account: ResponseAccount = value.clone().into();
331
332 if state_guard.tokens.contains_key(key) {
333 let original_address = account.address;
334 let (impl_addr, proxy_state) = match state_guard
343 .proxy_token_addresses
344 .get(&original_address)
345 {
346 Some(impl_addr) => {
347 let proxy_state = AccountUpdate::new(
354 original_address,
355 value.chain.into(),
356 account.slots.clone(),
357 Some(account.native_balance),
358 None,
359 ChangeType::Update,
360 );
361 (*impl_addr, proxy_state)
362 }
363 None => {
364 let impl_addr = generate_proxy_token_address(
368 state_guard.proxy_token_addresses.len() as u32,
369 )?;
370 state_guard
371 .proxy_token_addresses
372 .insert(original_address, impl_addr);
373
374 let proxy_state = create_proxy_token_account(
376 original_address,
377 Some(impl_addr),
378 &account.slots,
379 value.chain.into(),
380 Some(account.native_balance),
381 );
382
383 (impl_addr, proxy_state)
384 }
385 };
386
387 proxy_token_accounts.insert(original_address, proxy_state);
388
389 let impl_update = ResponseAccount {
391 address: impl_addr,
392 slots: HashMap::new(),
393 ..account.clone()
394 };
395 storage_by_address.insert(impl_addr, impl_update);
396 } else {
397 storage_by_address.insert(account.address, account);
399 }
400 }
401
402 info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
403 update_engine(
404 SHARED_TYCHO_DB.clone(),
405 header.clone().block(),
406 Some(storage_by_address),
407 proxy_token_accounts,
408 )
409 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
410 info!("Engine updated");
411 drop(state_guard);
412 }
413
414 let account_balances = protocol_msg
417 .clone()
418 .snapshots
419 .get_vm_storage()
420 .iter()
421 .filter_map(|(addr, acc)| {
422 let balances = acc.token_balances.clone();
423 if balances.is_empty() {
424 return None;
425 }
426 Some((addr.clone(), balances))
427 })
428 .collect::<AccountBalances>();
429
430 let mut new_components = HashMap::new();
431 let mut count_token_skips = 0;
432 let mut components_to_store = HashMap::new();
433 {
434 let state_guard = self.state.read().await;
435
436 'snapshot_loop: for (id, snapshot) in protocol_msg
438 .snapshots
439 .get_states()
440 .clone()
441 {
442 if self
444 .inclusion_filters
445 .get(protocol.as_str())
446 .is_some_and(|predicate| !predicate(&snapshot))
447 {
448 continue;
449 }
450
451 let mut component_tokens = Vec::new();
453 let mut new_tokens_accounts = HashMap::new();
454 for token in snapshot.component.tokens.clone() {
455 match state_guard.tokens.get(&token) {
456 Some(token) => {
457 component_tokens.push(token.clone());
458
459 let token_address = match bytes_to_address(&token.address) {
462 Ok(addr) => addr,
463 Err(_) => {
464 count_token_skips += 1;
465 msg_failed_components.insert(id.clone());
466 warn!(
467 "Token address could not be decoded {}, ignoring pool {:x?}",
468 token.address, id
469 );
470 continue 'snapshot_loop;
471 }
472 };
473 if !state_guard
475 .proxy_token_addresses
476 .contains_key(&token_address)
477 {
478 new_tokens_accounts.insert(
479 token_address,
480 create_proxy_token_account(
481 token_address,
482 None,
483 &HashMap::new(),
484 snapshot.component.chain.into(),
485 None,
486 ),
487 );
488 }
489 }
490 None => {
491 count_token_skips += 1;
492 msg_failed_components.insert(id.clone());
493 debug!("Token not found {}, ignoring pool {:x?}", token, id);
494 continue 'snapshot_loop;
495 }
496 }
497 }
498 let component = ProtocolComponent::from_with_tokens(
499 snapshot.component.clone(),
500 component_tokens,
501 );
502
503 if !new_tokens_accounts.is_empty() {
505 update_engine(
506 SHARED_TYCHO_DB.clone(),
507 header.clone().block(),
508 None,
509 new_tokens_accounts,
510 )
511 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
512 }
513
514 if !component
517 .static_attributes
518 .contains_key("manual_updates")
519 {
520 for contract in &component.contract_ids {
521 contracts_map
522 .entry(contract.clone())
523 .or_insert_with(HashSet::new)
524 .insert(id.clone());
525 }
526 for (_, tracing) in snapshot.entrypoints.iter() {
529 for contract in tracing.accessed_slots.keys().cloned() {
530 contracts_map
531 .entry(contract)
532 .or_insert_with(HashSet::new)
533 .insert(id.clone());
534 }
535 }
536 }
537
538 new_pairs.insert(id.clone(), component.clone());
540
541 components_to_store.insert(id.clone(), component);
543
544 if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
546 match state_decode_f(
547 snapshot,
548 header.clone(),
549 account_balances.clone(),
550 self.state.clone(),
551 )
552 .await
553 {
554 Ok(state) => {
555 new_components.insert(id.clone(), state);
556 }
557 Err(e) => {
558 if self.skip_state_decode_failures {
559 warn!(pool = id, error = %e, "StateDecodingFailure");
560 msg_failed_components.insert(id.clone());
561 continue 'snapshot_loop;
562 } else {
563 error!(pool = id, error = %e, "StateDecodingFailure");
564 return Err(StreamDecodeError::Fatal(format!("{e}")));
565 }
566 }
567 }
568 } else if self.skip_state_decode_failures {
569 warn!(pool = id, "MissingDecoderRegistration");
570 msg_failed_components.insert(id.clone());
571 continue 'snapshot_loop;
572 } else {
573 error!(pool = id, "MissingDecoderRegistration");
574 return Err(StreamDecodeError::Fatal(format!(
575 "Missing decoder registration for: {id}"
576 )));
577 }
578 }
579 }
580
581 if !components_to_store.is_empty() {
583 let mut state_guard = self.state.write().await;
584 for (id, component) in components_to_store {
585 state_guard
586 .components
587 .insert(id, component);
588 }
589 }
590
591 if !protocol_msg.snapshots.states.is_empty() {
592 info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
593 }
594 if count_token_skips > 0 {
595 info!("Skipped {count_token_skips} pools due to missing tokens");
596 }
597
598 updated_states.extend(new_components);
600
601 if let Some(deltas) = protocol_msg.deltas.clone() {
603 let mut state_guard = self.state.write().await;
605
606 let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
607 for (key, value) in deltas.account_updates.iter() {
608 let mut update: AccountUpdate = value.clone().into();
609
610 if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
616 error!(
617 update = ?update,
618 "FaultyCreationDelta"
619 );
620 update.change = ChangeType::Update;
621 }
622
623 if state_guard.tokens.contains_key(key) {
624 let original_address = update.address;
625 let impl_addr = match state_guard
632 .proxy_token_addresses
633 .get(&original_address)
634 {
635 Some(impl_addr) => {
636 let proxy_update = AccountUpdate { code: None, ..update.clone() };
640 account_update_by_address.insert(original_address, proxy_update);
641
642 *impl_addr
643 }
644 None => {
645 let impl_addr = generate_proxy_token_address(
650 state_guard.proxy_token_addresses.len() as u32,
651 )?;
652 state_guard
653 .proxy_token_addresses
654 .insert(original_address, impl_addr);
655
656 let proxy_state = create_proxy_token_account(
659 original_address,
660 Some(impl_addr),
661 &update.slots,
662 update.chain,
663 update.balance,
664 );
665 account_update_by_address.insert(original_address, proxy_state);
666
667 impl_addr
668 }
669 };
670
671 if update.code.is_some() {
673 let impl_update = AccountUpdate {
674 address: impl_addr,
675 slots: HashMap::new(),
676 ..update.clone()
677 };
678 account_update_by_address.insert(impl_addr, impl_update);
679 }
680 } else {
681 account_update_by_address.insert(update.address, update);
683 }
684 }
685 drop(state_guard);
686
687 let state_guard = self.state.read().await;
688 info!("Updating engine with {} contract deltas", deltas.account_updates.len());
689 update_engine(
690 SHARED_TYCHO_DB.clone(),
691 header.clone().block(),
692 None,
693 account_update_by_address,
694 )
695 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
696 info!("Engine updated");
697
698 let mut pools_to_update = HashSet::new();
700 for (account, _update) in deltas.account_updates {
701 pools_to_update.extend(
703 contracts_map
704 .get(&account)
705 .cloned()
706 .unwrap_or_default(),
707 );
708 pools_to_update.extend(
710 state_guard
711 .contracts_map
712 .get(&account)
713 .cloned()
714 .unwrap_or_default(),
715 );
716 }
717
718 let all_balances = Balances {
720 component_balances: deltas
721 .component_balances
722 .iter()
723 .map(|(pool_id, bals)| {
724 let mut balances = HashMap::new();
725 for (t, b) in &bals.0 {
726 balances.insert(t.clone(), b.balance.clone());
727 }
728 pools_to_update.insert(pool_id.clone());
729 (pool_id.clone(), balances)
730 })
731 .collect(),
732 account_balances: deltas
733 .account_balances
734 .iter()
735 .map(|(account, bals)| {
736 let mut balances = HashMap::new();
737 for (t, b) in bals {
738 balances.insert(t.clone(), b.balance.clone());
739 }
740 pools_to_update.extend(
741 contracts_map
742 .get(account)
743 .cloned()
744 .unwrap_or_default(),
745 );
746 (account.clone(), balances)
747 })
748 .collect(),
749 };
750
751 for (id, update) in deltas.state_updates {
753 let update_with_block =
755 Self::add_block_info_to_delta(update, current_block.clone());
756 match Self::apply_update(
757 &id,
758 update_with_block,
759 &mut updated_states,
760 &state_guard,
761 &all_balances,
762 ) {
763 Ok(_) => {
764 pools_to_update.remove(&id);
765 }
766 Err(e) => {
767 if self.skip_state_decode_failures {
768 warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
769 updated_states.remove(&id);
771 if let Some(component) = new_pairs.remove(&id) {
773 removed_pairs.insert(id.clone(), component);
774 } else if let Some(component) = state_guard.components.get(&id) {
775 removed_pairs.insert(id.clone(), component.clone());
776 } else {
777 warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
780 }
781 pools_to_update.remove(&id);
782
783 msg_failed_components.insert(id.clone());
785 } else {
786 return Err(e);
787 }
788 }
789 }
790 }
791
792 for pool in pools_to_update {
794 let default_delta_with_block = Self::add_block_info_to_delta(
796 ProtocolStateDelta::default(),
797 current_block.clone(),
798 );
799 match Self::apply_update(
800 &pool,
801 default_delta_with_block,
802 &mut updated_states,
803 &state_guard,
804 &all_balances,
805 ) {
806 Ok(_) => {}
807 Err(e) => {
808 if self.skip_state_decode_failures {
809 warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
810 updated_states.remove(&pool);
812 if let Some(component) = new_pairs.remove(&pool) {
814 removed_pairs.insert(pool.clone(), component);
815 } else if let Some(component) = state_guard.components.get(&pool) {
816 removed_pairs.insert(pool.clone(), component.clone());
817 } else {
818 warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
821 }
822
823 msg_failed_components.insert(pool.clone());
825 } else {
826 return Err(e);
827 }
828 }
829 }
830 }
831 };
832 }
833
834 let mut state_guard = self.state.write().await;
836
837 state_guard
839 .failed_components
840 .extend(msg_failed_components);
841
842 updated_states.retain(|id, _| {
846 !state_guard
847 .failed_components
848 .contains(id)
849 });
850 new_pairs.retain(|id, _| {
851 !state_guard
852 .failed_components
853 .contains(id)
854 });
855
856 state_guard
857 .states
858 .extend(updated_states.clone().into_iter());
859
860 for (id, component) in new_pairs.iter() {
862 state_guard
863 .components
864 .insert(id.clone(), component.clone());
865 }
866
867 for (id, _) in removed_pairs.iter() {
869 state_guard.components.remove(id);
870 }
871
872 for (key, values) in contracts_map {
873 state_guard
874 .contracts_map
875 .entry(key)
876 .or_insert_with(HashSet::new)
877 .extend(values);
878 }
879
880 Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
882 .set_removed_pairs(removed_pairs)
883 .set_sync_states(msg.sync_states.clone()))
884 }
885
886 fn add_block_info_to_delta(
888 mut delta: ProtocolStateDelta,
889 block_header_opt: Option<BlockHeader>,
890 ) -> ProtocolStateDelta {
891 if let Some(header) = block_header_opt {
892 delta.updated_attributes.insert(
895 "block_number".to_string(),
896 Bytes::from(header.number.to_be_bytes().to_vec()),
897 );
898 delta.updated_attributes.insert(
899 "block_timestamp".to_string(),
900 Bytes::from(header.timestamp.to_be_bytes().to_vec()),
901 );
902 }
903 delta
904 }
905
906 fn apply_update(
907 id: &String,
908 update: ProtocolStateDelta,
909 updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
910 state_guard: &RwLockReadGuard<'_, DecoderState>,
911 all_balances: &Balances,
912 ) -> Result<(), StreamDecodeError> {
913 match updated_states.entry(id.clone()) {
914 Entry::Occupied(mut entry) => {
915 let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
917 state
918 .delta_transition(update, &state_guard.tokens, all_balances)
919 .map_err(|e| {
920 error!(pool = id, error = ?e, "DeltaTransitionError");
921 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
922 })?;
923 }
924 Entry::Vacant(_) => {
925 match state_guard.states.get(id) {
926 Some(stored_state) => {
929 let mut state = stored_state.clone();
930 state
931 .delta_transition(update, &state_guard.tokens, all_balances)
932 .map_err(|e| {
933 error!(pool = id, error = ?e, "DeltaTransitionError");
934 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
935 })?;
936 updated_states.insert(id.clone(), state);
937 }
938 None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
939 }
940 }
941 }
942 Ok(())
943 }
944}
945
946fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
948 let padded_idx = format!("{idx:x}");
949 let padded_zeroes = "0".repeat(33 - padded_idx.len());
950 let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
951 let decoded = hex::decode(proxy_token_address).map_err(|e| {
952 StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
953 })?;
954
955 const ADDRESS_LENGTH: usize = 20;
956 if decoded.len() != ADDRESS_LENGTH {
957 return Err(StreamDecodeError::Fatal(format!(
958 "Invalid proxy token address length: expected {}, got {}",
959 ADDRESS_LENGTH,
960 decoded.len(),
961 )));
962 }
963
964 Ok(Address::from_slice(&decoded))
965}
966
967fn create_proxy_token_account(
972 addr: Address,
973 new_address: Option<Address>,
974 storage: &HashMap<U256, U256>,
975 chain: Chain,
976 balance: Option<U256>,
977) -> AccountUpdate {
978 let mut slots = storage.clone();
979 if let Some(new_address) = new_address {
980 slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
981 }
982
983 AccountUpdate {
984 address: addr,
985 chain,
986 slots,
987 balance,
988 code: Some(ERC20_PROXY_BYTECODE.to_vec()),
989 change: ChangeType::Creation,
990 }
991}
992
993#[cfg(test)]
994mock! {
995 #[derive(Debug)]
996 pub ProtocolSim {
997 pub fn fee(&self) -> f64;
998 pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
999 pub fn get_amount_out(
1000 &self,
1001 amount_in: BigUint,
1002 token_in: &Token,
1003 token_out: &Token,
1004 ) -> Result<GetAmountOutResult, SimulationError>;
1005 pub fn get_limits(
1006 &self,
1007 sell_token: Bytes,
1008 buy_token: Bytes,
1009 ) -> Result<(BigUint, BigUint), SimulationError>;
1010 pub fn delta_transition(
1011 &mut self,
1012 delta: ProtocolStateDelta,
1013 tokens: &HashMap<Bytes, Token>,
1014 balances: &Balances,
1015 ) -> Result<(), TransitionError>;
1016 pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1017 pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1018 }
1019}
1020
1021#[cfg(test)]
1022crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1023
1024#[cfg(test)]
1025impl ProtocolSim for MockProtocolSim {
1026 fn fee(&self) -> f64 {
1027 self.fee()
1028 }
1029
1030 fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1031 self.spot_price(base, quote)
1032 }
1033
1034 fn get_amount_out(
1035 &self,
1036 amount_in: BigUint,
1037 token_in: &Token,
1038 token_out: &Token,
1039 ) -> Result<GetAmountOutResult, SimulationError> {
1040 self.get_amount_out(amount_in, token_in, token_out)
1041 }
1042
1043 fn get_limits(
1044 &self,
1045 sell_token: Bytes,
1046 buy_token: Bytes,
1047 ) -> Result<(BigUint, BigUint), SimulationError> {
1048 self.get_limits(sell_token, buy_token)
1049 }
1050
1051 fn delta_transition(
1052 &mut self,
1053 delta: ProtocolStateDelta,
1054 tokens: &HashMap<Bytes, Token>,
1055 balances: &Balances,
1056 ) -> Result<(), TransitionError> {
1057 self.delta_transition(delta, tokens, balances)
1058 }
1059
1060 fn clone_box(&self) -> Box<dyn ProtocolSim> {
1061 self.clone_box()
1062 }
1063
1064 fn as_any(&self) -> &dyn Any {
1065 panic!("MockProtocolSim does not support as_any")
1066 }
1067
1068 fn as_any_mut(&mut self) -> &mut dyn Any {
1069 panic!("MockProtocolSim does not support as_any_mut")
1070 }
1071
1072 fn eq(&self, other: &dyn ProtocolSim) -> bool {
1073 self.eq(other)
1074 }
1075
1076 fn typetag_name(&self) -> &'static str {
1077 unreachable!()
1078 }
1079
1080 fn typetag_deserialize(&self) {
1081 unreachable!()
1082 }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087 use std::{fs, path::Path, str::FromStr};
1088
1089 use alloy::primitives::address;
1090 use mockall::predicate::*;
1091 use rstest::*;
1092 use tycho_client::feed::BlockHeader;
1093 use tycho_common::{models::Chain, Bytes};
1094
1095 use super::*;
1096 use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1097
1098 async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1099 let mut decoder = TychoStreamDecoder::new();
1100 decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1101 if set_tokens {
1102 let tokens = [
1103 Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1104 Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1105 ]
1106 .iter()
1107 .map(|addr| {
1108 let addr_str = format!("{addr:x}");
1109 (
1110 addr.clone(),
1111 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1112 )
1113 })
1114 .collect();
1115 decoder.set_tokens(tokens).await;
1116 }
1117 decoder
1118 }
1119
1120 fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1121 let project_root = env!("CARGO_MANIFEST_DIR");
1122 let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1123 let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1124 serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1125 }
1126
1127 #[tokio::test]
1128 async fn test_decode() {
1129 let decoder = setup_decoder(true).await;
1130
1131 let msg = load_test_msg("uniswap_v2_snapshot");
1132 let res1 = decoder
1133 .decode(&msg)
1134 .await
1135 .expect("decode failure");
1136 let msg = load_test_msg("uniswap_v2_delta");
1137 let res2 = decoder
1138 .decode(&msg)
1139 .await
1140 .expect("decode failure");
1141
1142 assert_eq!(res1.states.len(), 1);
1143 assert_eq!(res2.states.len(), 1);
1144 assert_eq!(res1.sync_states.len(), 1);
1145 assert_eq!(res2.sync_states.len(), 1);
1146 }
1147
1148 #[tokio::test]
1149 async fn test_decode_component_missing_token() {
1150 let decoder = setup_decoder(false).await;
1151 let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1152 .iter()
1153 .map(|addr| {
1154 let addr_str = format!("{addr:x}");
1155 (
1156 addr.clone(),
1157 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1158 )
1159 })
1160 .collect();
1161 decoder.set_tokens(tokens).await;
1162
1163 let msg = load_test_msg("uniswap_v2_snapshot");
1164 let res1 = decoder
1165 .decode(&msg)
1166 .await
1167 .expect("decode failure");
1168
1169 assert_eq!(res1.states.len(), 0);
1170 }
1171
1172 #[tokio::test]
1173 async fn test_decode_component_bad_id() {
1174 let decoder = setup_decoder(true).await;
1175 let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1176
1177 match decoder.decode(&msg).await {
1178 Err(StreamDecodeError::Fatal(msg)) => {
1179 assert_eq!(msg, "Component id mismatch");
1180 }
1181 Ok(_) => {
1182 panic!("Expected failures to be raised")
1183 }
1184 }
1185 }
1186
1187 #[rstest]
1188 #[case(true)]
1189 #[case(false)]
1190 #[tokio::test]
1191 async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1192 let mut decoder = setup_decoder(true).await;
1193 decoder.skip_state_decode_failures = skip_failures;
1194
1195 let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1196 match decoder.decode(&msg).await {
1197 Err(StreamDecodeError::Fatal(msg)) => {
1198 if !skip_failures {
1199 assert_eq!(msg, "Missing attributes reserve0");
1200 } else {
1201 panic!("Expected failures to be ignored. Err: {msg}")
1202 }
1203 }
1204 Ok(res) => {
1205 if !skip_failures {
1206 panic!("Expected failures to be raised")
1207 } else {
1208 assert_eq!(res.states.len(), 0);
1209 }
1210 }
1211 }
1212 }
1213
1214 #[tokio::test]
1215 async fn test_decode_updates_state_on_contract_change() {
1216 let decoder = setup_decoder(true).await;
1217
1218 let mut mock_state = MockProtocolSim::new();
1220
1221 mock_state
1222 .expect_clone_box()
1223 .times(1)
1224 .returning(|| {
1225 let mut cloned_mock_state = MockProtocolSim::new();
1226 cloned_mock_state
1228 .expect_delta_transition()
1229 .times(1)
1230 .returning(|_, _, _| Ok(()));
1231 cloned_mock_state
1232 .expect_clone_box()
1233 .times(1)
1234 .returning(|| Box::new(MockProtocolSim::new()));
1235 Box::new(cloned_mock_state)
1236 });
1237
1238 let pool_id =
1240 "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1241 decoder
1242 .state
1243 .write()
1244 .await
1245 .states
1246 .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1247 decoder
1248 .state
1249 .write()
1250 .await
1251 .contracts_map
1252 .insert(
1253 Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1254 HashSet::from([pool_id.clone()]),
1255 );
1256
1257 let msg = load_test_msg("balancer_v2_delta");
1259
1260 let _ = decoder
1262 .decode(&msg)
1263 .await
1264 .expect("decode failure");
1265
1266 }
1268
1269 #[test]
1270 fn test_generate_proxy_token_address() {
1271 let idx = 1;
1272 let generated_address =
1273 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1274 assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1275
1276 let idx = 123456;
1277 let generated_address =
1278 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1279 assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1280 }
1281
1282 #[tokio::test(flavor = "multi_thread")]
1283 async fn test_euler_hook_low_pool_manager_balance() {
1284 let mut decoder = TychoStreamDecoder::new();
1285
1286 decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1287 "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1288 );
1289
1290 let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1291 let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1292 let tokens = HashMap::from([
1293 (
1294 weth.clone(),
1295 Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1296 ),
1297 (
1298 teth.clone(),
1299 Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1300 ),
1301 ]);
1302
1303 decoder.set_tokens(tokens.clone()).await;
1304
1305 let msg = load_test_msg("euler_hook_snapshot");
1306 let res = decoder
1307 .decode(&msg)
1308 .await
1309 .expect("decode failure");
1310
1311 let pool_state = res
1312 .states
1313 .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1314 .expect("Couldn't find target pool");
1315 let amount_out = pool_state
1316 .get_amount_out(
1317 BigUint::from_str("1000000000000000000").unwrap(),
1318 tokens.get(&teth).unwrap(),
1319 tokens.get(&weth).unwrap(),
1320 )
1321 .expect("Get amount out failed");
1322
1323 assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1324 }
1325}