1use std::{
2 collections::{hash_map::Entry, HashMap, HashSet},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14 dto::{ChangeType, ProtocolStateDelta},
15 models::{token::Token, Chain},
16 simulation::protocol_sim::{Balances, ProtocolSim},
17 Bytes,
18};
19#[cfg(test)]
20use {
21 mockall::mock,
22 num_bigint::BigUint,
23 std::any::Any,
24 tycho_common::simulation::{
25 errors::{SimulationError, TransitionError},
26 protocol_sim::GetAmountOutResult,
27 },
28};
29
30use crate::{
31 evm::{
32 engine_db::{update_engine, SHARED_TYCHO_DB},
33 protocol::{
34 utils::bytes_to_address,
35 vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36 },
37 tycho_models::{AccountUpdate, ResponseAccount},
38 },
39 protocol::{
40 errors::InvalidSnapshotError,
41 models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42 },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47 #[error("{0}")]
48 Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53 tokens: HashMap<Bytes, Token>,
54 states: HashMap<String, Box<dyn ProtocolSim>>,
55 components: HashMap<String, ProtocolComponent>,
56 contracts_map: HashMap<Bytes, HashSet<String>>,
58 proxy_token_addresses: HashMap<Address, Address>,
60 failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67 Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70 + Send
71 + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74pub struct TychoStreamDecoder<H>
87where
88 H: HeaderLike,
89{
90 state: Arc<RwLock<DecoderState>>,
91 skip_state_decode_failures: bool,
92 min_token_quality: u32,
93 registry: HashMap<String, Box<RegistryFn<H>>>,
94 inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110 pub fn new() -> Self {
111 Self {
112 state: Arc::new(RwLock::new(DecoderState::default())),
113 skip_state_decode_failures: false,
114 min_token_quality: 51,
115 registry: HashMap::new(),
116 inclusion_filters: HashMap::new(),
117 }
118 }
119
120 pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125 let mut guard = self.state.write().await;
126 guard.tokens = tokens;
127 }
128
129 pub fn skip_state_decode_failures(&mut self, skip: bool) {
130 self.skip_state_decode_failures = skip;
131 }
132
133 pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
146 where
147 T: ProtocolSim
148 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
149 + Send
150 + 'static,
151 {
152 let decoder = Box::new(
153 move |component: ComponentWithState,
154 header: H,
155 account_balances: AccountBalances,
156 state: Arc<RwLock<DecoderState>>| {
157 let context = context.clone();
158 Box::pin(async move {
159 let guard = state.read().await;
160 T::try_from_with_header(
161 component,
162 header,
163 &account_balances,
164 &guard.tokens,
165 &context,
166 )
167 .await
168 .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
169 }) as DecodeFut
170 },
171 );
172 self.registry
173 .insert(exchange.to_string(), decoder);
174 }
175
176 pub fn register_decoder<T>(&mut self, exchange: &str)
188 where
189 T: ProtocolSim
190 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
191 + Send
192 + 'static,
193 {
194 let context = DecoderContext::new();
195 self.register_decoder_with_context::<T>(exchange, context);
196 }
197
198 pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
214 self.inclusion_filters
215 .insert(exchange.to_string(), predicate);
216 }
217
218 pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
221 let mut updated_states = HashMap::new();
223 let mut new_pairs = HashMap::new();
224 let mut removed_pairs = HashMap::new();
225 let mut contracts_map = HashMap::new();
226 let mut msg_failed_components = HashSet::new();
227
228 let header = msg
229 .state_msgs
230 .values()
231 .next()
232 .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
233 .header
234 .clone();
235
236 let block_number_or_timestamp = header
237 .clone()
238 .block_number_or_timestamp();
239 let current_block = header.clone().block();
240
241 for (protocol, protocol_msg) in msg.state_msgs.iter() {
242 if let Some(deltas) = protocol_msg.deltas.as_ref() {
244 let mut state_guard = self.state.write().await;
245
246 let new_tokens = deltas
247 .new_tokens
248 .iter()
249 .filter(|(addr, t)| {
250 t.quality >= self.min_token_quality &&
251 !state_guard.tokens.contains_key(*addr)
252 })
253 .filter_map(|(addr, t)| {
254 t.clone()
255 .try_into()
256 .map(|token| (addr.clone(), token))
257 .inspect_err(|e| {
258 warn!("Failed decoding token {e:?} {addr:#044x}");
259 *e
260 })
261 .ok()
262 })
263 .collect::<HashMap<Bytes, Token>>();
264
265 if !new_tokens.is_empty() {
266 debug!(n = new_tokens.len(), "NewTokens");
267 state_guard.tokens.extend(new_tokens);
268 }
269 }
270
271 {
273 let mut state_guard = self.state.write().await;
274 let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
275 .removed_components
276 .iter()
277 .map(|(id, comp)| {
278 if *id != comp.id {
279 error!(
280 "Component id mismatch in removed components {id} != {}",
281 comp.id
282 );
283 return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
284 }
285
286 let tokens = comp
287 .tokens
288 .iter()
289 .flat_map(|addr| state_guard.tokens.get(addr).cloned())
290 .collect::<Vec<_>>();
291
292 if tokens.len() == comp.tokens.len() {
293 Ok(Some((
294 id.clone(),
295 ProtocolComponent::from_with_tokens(comp.clone(), tokens),
296 )))
297 } else {
298 Ok(None)
299 }
300 })
301 .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
302 )?
303 .into_iter()
304 .flatten()
305 .collect();
306
307 for (id, component) in removed_components {
309 state_guard.components.remove(&id);
310 state_guard.states.remove(&id);
311 removed_pairs.insert(id, component);
312 }
313
314 info!(
316 "Processing {} contracts from snapshots",
317 protocol_msg
318 .snapshots
319 .get_vm_storage()
320 .len()
321 );
322
323 let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
324 let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
325 for (key, value) in protocol_msg
326 .snapshots
327 .get_vm_storage()
328 .iter()
329 {
330 let account: ResponseAccount = value.clone().into();
331
332 if state_guard.tokens.contains_key(key) {
333 let original_address = account.address;
334 let (impl_addr, proxy_state) = match state_guard
343 .proxy_token_addresses
344 .get(&original_address)
345 {
346 Some(impl_addr) => {
347 let proxy_state = AccountUpdate::new(
354 original_address,
355 value.chain.into(),
356 account.slots.clone(),
357 Some(account.native_balance),
358 None,
359 ChangeType::Update,
360 );
361 (*impl_addr, proxy_state)
362 }
363 None => {
364 let impl_addr = generate_proxy_token_address(
368 state_guard.proxy_token_addresses.len() as u32,
369 )?;
370 state_guard
371 .proxy_token_addresses
372 .insert(original_address, impl_addr);
373
374 let proxy_state = create_proxy_token_account(
376 original_address,
377 Some(impl_addr),
378 &account.slots,
379 value.chain.into(),
380 Some(account.native_balance),
381 );
382
383 (impl_addr, proxy_state)
384 }
385 };
386
387 proxy_token_accounts.insert(original_address, proxy_state);
388
389 let impl_update = ResponseAccount {
391 address: impl_addr,
392 slots: HashMap::new(),
393 ..account.clone()
394 };
395 storage_by_address.insert(impl_addr, impl_update);
396 } else {
397 storage_by_address.insert(account.address, account);
399 }
400 }
401
402 info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
403 update_engine(
404 SHARED_TYCHO_DB.clone(),
405 header.clone().block(),
406 Some(storage_by_address),
407 proxy_token_accounts,
408 )
409 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
410 info!("Engine updated");
411 drop(state_guard);
412 }
413
414 let account_balances = protocol_msg
417 .clone()
418 .snapshots
419 .get_vm_storage()
420 .iter()
421 .filter_map(|(addr, acc)| {
422 let balances = acc.token_balances.clone();
423 if balances.is_empty() {
424 return None;
425 }
426 Some((addr.clone(), balances))
427 })
428 .collect::<AccountBalances>();
429
430 let mut new_components = HashMap::new();
431 let mut count_token_skips = 0;
432 let mut components_to_store = HashMap::new();
433 {
434 let state_guard = self.state.read().await;
435
436 'snapshot_loop: for (id, snapshot) in protocol_msg
438 .snapshots
439 .get_states()
440 .clone()
441 {
442 if self
444 .inclusion_filters
445 .get(protocol.as_str())
446 .is_some_and(|predicate| !predicate(&snapshot))
447 {
448 continue;
449 }
450
451 let mut component_tokens = Vec::new();
453 let mut new_tokens_accounts = HashMap::new();
454 for token in snapshot.component.tokens.clone() {
455 match state_guard.tokens.get(&token) {
456 Some(token) => {
457 component_tokens.push(token.clone());
458
459 let token_address = match bytes_to_address(&token.address) {
462 Ok(addr) => addr,
463 Err(_) => {
464 warn!(
465 "Token address could not be decoded {}, ignoring pool {:x?}",
466 token.address, id
467 );
468 continue 'snapshot_loop;
469 }
470 };
471 if !state_guard
473 .proxy_token_addresses
474 .contains_key(&token_address)
475 {
476 new_tokens_accounts.insert(
477 token_address,
478 create_proxy_token_account(
479 token_address,
480 None,
481 &HashMap::new(),
482 snapshot.component.chain.into(),
483 None,
484 ),
485 );
486 }
487 }
488 None => {
489 count_token_skips += 1;
490 debug!("Token not found {}, ignoring pool {:x?}", token, id);
491 continue 'snapshot_loop;
492 }
493 }
494 }
495 let component = ProtocolComponent::from_with_tokens(
496 snapshot.component.clone(),
497 component_tokens,
498 );
499
500 if !new_tokens_accounts.is_empty() {
502 update_engine(
503 SHARED_TYCHO_DB.clone(),
504 header.clone().block(),
505 None,
506 new_tokens_accounts,
507 )
508 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
509 }
510
511 if !component
514 .static_attributes
515 .contains_key("manual_updates")
516 {
517 for contract in &component.contract_ids {
518 contracts_map
519 .entry(contract.clone())
520 .or_insert_with(HashSet::new)
521 .insert(id.clone());
522 }
523 for (_, tracing) in snapshot.entrypoints.iter() {
526 for contract in tracing.accessed_slots.keys().cloned() {
527 contracts_map
528 .entry(contract)
529 .or_insert_with(HashSet::new)
530 .insert(id.clone());
531 }
532 }
533 }
534
535 new_pairs.insert(id.clone(), component.clone());
537
538 components_to_store.insert(id.clone(), component);
540
541 if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
543 match state_decode_f(
544 snapshot,
545 header.clone(),
546 account_balances.clone(),
547 self.state.clone(),
548 )
549 .await
550 {
551 Ok(state) => {
552 new_components.insert(id.clone(), state);
553 }
554 Err(e) => {
555 if self.skip_state_decode_failures {
556 warn!(pool = id, error = %e, "StateDecodingFailure");
557 msg_failed_components.insert(id.clone());
558 continue 'snapshot_loop;
559 } else {
560 error!(pool = id, error = %e, "StateDecodingFailure");
561 return Err(StreamDecodeError::Fatal(format!("{e}")));
562 }
563 }
564 }
565 } else if self.skip_state_decode_failures {
566 warn!(pool = id, "MissingDecoderRegistration");
567 msg_failed_components.insert(id.clone());
568 continue 'snapshot_loop;
569 } else {
570 error!(pool = id, "MissingDecoderRegistration");
571 return Err(StreamDecodeError::Fatal(format!(
572 "Missing decoder registration for: {id}"
573 )));
574 }
575 }
576 }
577
578 if !components_to_store.is_empty() {
580 let mut state_guard = self.state.write().await;
581 for (id, component) in components_to_store {
582 state_guard
583 .components
584 .insert(id, component);
585 }
586 }
587
588 if !protocol_msg.snapshots.states.is_empty() {
589 info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
590 }
591 if count_token_skips > 0 {
592 info!("Skipped {count_token_skips} pools due to missing tokens");
593 }
594
595 updated_states.extend(new_components);
597
598 if let Some(deltas) = protocol_msg.deltas.clone() {
600 let mut state_guard = self.state.write().await;
602
603 let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
604 for (key, value) in deltas.account_updates.iter() {
605 let mut update: AccountUpdate = value.clone().into();
606
607 if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
613 error!(
614 update = ?update,
615 "FaultyCreationDelta"
616 );
617 update.change = ChangeType::Update;
618 }
619
620 if state_guard.tokens.contains_key(key) {
621 let original_address = update.address;
622 let impl_addr = match state_guard
629 .proxy_token_addresses
630 .get(&original_address)
631 {
632 Some(impl_addr) => {
633 let proxy_update = AccountUpdate { code: None, ..update.clone() };
637 account_update_by_address.insert(original_address, proxy_update);
638
639 *impl_addr
640 }
641 None => {
642 let impl_addr = generate_proxy_token_address(
647 state_guard.proxy_token_addresses.len() as u32,
648 )?;
649 state_guard
650 .proxy_token_addresses
651 .insert(original_address, impl_addr);
652
653 let proxy_state = create_proxy_token_account(
656 original_address,
657 Some(impl_addr),
658 &update.slots,
659 update.chain,
660 update.balance,
661 );
662 account_update_by_address.insert(original_address, proxy_state);
663
664 impl_addr
665 }
666 };
667
668 if update.code.is_some() {
670 let impl_update = AccountUpdate {
671 address: impl_addr,
672 slots: HashMap::new(),
673 ..update.clone()
674 };
675 account_update_by_address.insert(impl_addr, impl_update);
676 }
677 } else {
678 account_update_by_address.insert(update.address, update);
680 }
681 }
682 drop(state_guard);
683
684 let state_guard = self.state.read().await;
685 info!("Updating engine with {} contract deltas", deltas.account_updates.len());
686 update_engine(
687 SHARED_TYCHO_DB.clone(),
688 header.clone().block(),
689 None,
690 account_update_by_address,
691 )
692 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
693 info!("Engine updated");
694
695 let mut pools_to_update = HashSet::new();
697 for (account, _update) in deltas.account_updates {
698 pools_to_update.extend(
700 contracts_map
701 .get(&account)
702 .cloned()
703 .unwrap_or_default(),
704 );
705 pools_to_update.extend(
707 state_guard
708 .contracts_map
709 .get(&account)
710 .cloned()
711 .unwrap_or_default(),
712 );
713 }
714
715 let all_balances = Balances {
717 component_balances: deltas
718 .component_balances
719 .iter()
720 .map(|(pool_id, bals)| {
721 let mut balances = HashMap::new();
722 for (t, b) in &bals.0 {
723 balances.insert(t.clone(), b.balance.clone());
724 }
725 pools_to_update.insert(pool_id.clone());
726 (pool_id.clone(), balances)
727 })
728 .collect(),
729 account_balances: deltas
730 .account_balances
731 .iter()
732 .map(|(account, bals)| {
733 let mut balances = HashMap::new();
734 for (t, b) in bals {
735 balances.insert(t.clone(), b.balance.clone());
736 }
737 pools_to_update.extend(
738 contracts_map
739 .get(account)
740 .cloned()
741 .unwrap_or_default(),
742 );
743 (account.clone(), balances)
744 })
745 .collect(),
746 };
747
748 for (id, update) in deltas.state_updates {
750 let update_with_block =
752 Self::add_block_info_to_delta(update, current_block.clone());
753 match Self::apply_update(
754 &id,
755 update_with_block,
756 &mut updated_states,
757 &state_guard,
758 &all_balances,
759 ) {
760 Ok(_) => {
761 pools_to_update.remove(&id);
762 }
763 Err(e) => {
764 if self.skip_state_decode_failures {
765 warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
766 updated_states.remove(&id);
768 if let Some(component) = new_pairs.remove(&id) {
770 removed_pairs.insert(id.clone(), component);
771 } else if let Some(component) = state_guard.components.get(&id) {
772 removed_pairs.insert(id.clone(), component.clone());
773 } else {
774 warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
777 }
778 pools_to_update.remove(&id);
779
780 msg_failed_components.insert(id.clone());
782 } else {
783 return Err(e);
784 }
785 }
786 }
787 }
788
789 for pool in pools_to_update {
791 let default_delta_with_block = Self::add_block_info_to_delta(
793 ProtocolStateDelta::default(),
794 current_block.clone(),
795 );
796 match Self::apply_update(
797 &pool,
798 default_delta_with_block,
799 &mut updated_states,
800 &state_guard,
801 &all_balances,
802 ) {
803 Ok(_) => {}
804 Err(e) => {
805 if self.skip_state_decode_failures {
806 warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
807 updated_states.remove(&pool);
809 if let Some(component) = new_pairs.remove(&pool) {
811 removed_pairs.insert(pool.clone(), component);
812 } else if let Some(component) = state_guard.components.get(&pool) {
813 removed_pairs.insert(pool.clone(), component.clone());
814 } else {
815 warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
818 }
819
820 msg_failed_components.insert(pool.clone());
822 } else {
823 return Err(e);
824 }
825 }
826 }
827 }
828 };
829 }
830
831 let mut state_guard = self.state.write().await;
833
834 state_guard
836 .failed_components
837 .extend(msg_failed_components);
838
839 updated_states.retain(|id, _| {
843 !state_guard
844 .failed_components
845 .contains(id)
846 });
847 new_pairs.retain(|id, _| {
848 !state_guard
849 .failed_components
850 .contains(id)
851 });
852
853 state_guard
854 .states
855 .extend(updated_states.clone().into_iter());
856
857 for (id, component) in new_pairs.iter() {
859 state_guard
860 .components
861 .insert(id.clone(), component.clone());
862 }
863
864 for (id, _) in removed_pairs.iter() {
866 state_guard.components.remove(id);
867 }
868
869 for (key, values) in contracts_map {
870 state_guard
871 .contracts_map
872 .entry(key)
873 .or_insert_with(HashSet::new)
874 .extend(values);
875 }
876
877 Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
879 .set_removed_pairs(removed_pairs)
880 .set_sync_states(msg.sync_states.clone()))
881 }
882
883 fn add_block_info_to_delta(
885 mut delta: ProtocolStateDelta,
886 block_header_opt: Option<BlockHeader>,
887 ) -> ProtocolStateDelta {
888 if let Some(header) = block_header_opt {
889 delta.updated_attributes.insert(
892 "block_number".to_string(),
893 Bytes::from(header.number.to_be_bytes().to_vec()),
894 );
895 delta.updated_attributes.insert(
896 "block_timestamp".to_string(),
897 Bytes::from(header.timestamp.to_be_bytes().to_vec()),
898 );
899 }
900 delta
901 }
902
903 fn apply_update(
904 id: &String,
905 update: ProtocolStateDelta,
906 updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
907 state_guard: &RwLockReadGuard<'_, DecoderState>,
908 all_balances: &Balances,
909 ) -> Result<(), StreamDecodeError> {
910 match updated_states.entry(id.clone()) {
911 Entry::Occupied(mut entry) => {
912 let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
914 state
915 .delta_transition(update, &state_guard.tokens, all_balances)
916 .map_err(|e| {
917 error!(pool = id, error = ?e, "DeltaTransitionError");
918 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
919 })?;
920 }
921 Entry::Vacant(_) => {
922 match state_guard.states.get(id) {
923 Some(stored_state) => {
926 let mut state = stored_state.clone();
927 state
928 .delta_transition(update, &state_guard.tokens, all_balances)
929 .map_err(|e| {
930 error!(pool = id, error = ?e, "DeltaTransitionError");
931 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
932 })?;
933 updated_states.insert(id.clone(), state);
934 }
935 None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
936 }
937 }
938 }
939 Ok(())
940 }
941}
942
943fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
945 let padded_idx = format!("{idx:x}");
946 let padded_zeroes = "0".repeat(33 - padded_idx.len());
947 let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
948 let decoded = hex::decode(proxy_token_address).map_err(|e| {
949 StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
950 })?;
951
952 const ADDRESS_LENGTH: usize = 20;
953 if decoded.len() != ADDRESS_LENGTH {
954 return Err(StreamDecodeError::Fatal(format!(
955 "Invalid proxy token address length: expected {}, got {}",
956 ADDRESS_LENGTH,
957 decoded.len(),
958 )));
959 }
960
961 Ok(Address::from_slice(&decoded))
962}
963
964fn create_proxy_token_account(
969 addr: Address,
970 new_address: Option<Address>,
971 storage: &HashMap<U256, U256>,
972 chain: Chain,
973 balance: Option<U256>,
974) -> AccountUpdate {
975 let mut slots = storage.clone();
976 if let Some(new_address) = new_address {
977 slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
978 }
979
980 AccountUpdate {
981 address: addr,
982 chain,
983 slots,
984 balance,
985 code: Some(ERC20_PROXY_BYTECODE.to_vec()),
986 change: ChangeType::Creation,
987 }
988}
989
990#[cfg(test)]
991mock! {
992 #[derive(Debug)]
993 pub ProtocolSim {
994 pub fn fee(&self) -> f64;
995 pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
996 pub fn get_amount_out(
997 &self,
998 amount_in: BigUint,
999 token_in: &Token,
1000 token_out: &Token,
1001 ) -> Result<GetAmountOutResult, SimulationError>;
1002 pub fn get_limits(
1003 &self,
1004 sell_token: Bytes,
1005 buy_token: Bytes,
1006 ) -> Result<(BigUint, BigUint), SimulationError>;
1007 pub fn delta_transition(
1008 &mut self,
1009 delta: ProtocolStateDelta,
1010 tokens: &HashMap<Bytes, Token>,
1011 balances: &Balances,
1012 ) -> Result<(), TransitionError<String>>;
1013 pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1014 pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1015 }
1016}
1017
1018#[cfg(test)]
1019impl ProtocolSim for MockProtocolSim {
1020 fn fee(&self) -> f64 {
1021 self.fee()
1022 }
1023
1024 fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1025 self.spot_price(base, quote)
1026 }
1027
1028 fn get_amount_out(
1029 &self,
1030 amount_in: BigUint,
1031 token_in: &Token,
1032 token_out: &Token,
1033 ) -> Result<GetAmountOutResult, SimulationError> {
1034 self.get_amount_out(amount_in, token_in, token_out)
1035 }
1036
1037 fn get_limits(
1038 &self,
1039 sell_token: Bytes,
1040 buy_token: Bytes,
1041 ) -> Result<(BigUint, BigUint), SimulationError> {
1042 self.get_limits(sell_token, buy_token)
1043 }
1044
1045 fn delta_transition(
1046 &mut self,
1047 delta: ProtocolStateDelta,
1048 tokens: &HashMap<Bytes, Token>,
1049 balances: &Balances,
1050 ) -> Result<(), TransitionError<String>> {
1051 self.delta_transition(delta, tokens, balances)
1052 }
1053
1054 fn clone_box(&self) -> Box<dyn ProtocolSim> {
1055 self.clone_box()
1056 }
1057
1058 fn as_any(&self) -> &dyn Any {
1059 panic!("MockProtocolSim does not support as_any")
1060 }
1061
1062 fn as_any_mut(&mut self) -> &mut dyn Any {
1063 panic!("MockProtocolSim does not support as_any_mut")
1064 }
1065
1066 fn eq(&self, other: &dyn ProtocolSim) -> bool {
1067 self.eq(other)
1068 }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use std::{fs, path::Path, str::FromStr};
1074
1075 use alloy::primitives::address;
1076 use mockall::predicate::*;
1077 use rstest::*;
1078 use tycho_client::feed::BlockHeader;
1079 use tycho_common::{models::Chain, Bytes};
1080
1081 use super::*;
1082 use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1083
1084 async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1085 let mut decoder = TychoStreamDecoder::new();
1086 decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1087 if set_tokens {
1088 let tokens = [
1089 Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1090 Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1091 ]
1092 .iter()
1093 .map(|addr| {
1094 let addr_str = format!("{addr:x}");
1095 (
1096 addr.clone(),
1097 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1098 )
1099 })
1100 .collect();
1101 decoder.set_tokens(tokens).await;
1102 }
1103 decoder
1104 }
1105
1106 fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1107 let project_root = env!("CARGO_MANIFEST_DIR");
1108 let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1109 let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1110 serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1111 }
1112
1113 #[tokio::test]
1114 async fn test_decode() {
1115 let decoder = setup_decoder(true).await;
1116
1117 let msg = load_test_msg("uniswap_v2_snapshot");
1118 let res1 = decoder
1119 .decode(&msg)
1120 .await
1121 .expect("decode failure");
1122 let msg = load_test_msg("uniswap_v2_delta");
1123 let res2 = decoder
1124 .decode(&msg)
1125 .await
1126 .expect("decode failure");
1127
1128 assert_eq!(res1.states.len(), 1);
1129 assert_eq!(res2.states.len(), 1);
1130 assert_eq!(res1.sync_states.len(), 1);
1131 assert_eq!(res2.sync_states.len(), 1);
1132 }
1133
1134 #[tokio::test]
1135 async fn test_decode_component_missing_token() {
1136 let decoder = setup_decoder(false).await;
1137 let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1138 .iter()
1139 .map(|addr| {
1140 let addr_str = format!("{addr:x}");
1141 (
1142 addr.clone(),
1143 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1144 )
1145 })
1146 .collect();
1147 decoder.set_tokens(tokens).await;
1148
1149 let msg = load_test_msg("uniswap_v2_snapshot");
1150 let res1 = decoder
1151 .decode(&msg)
1152 .await
1153 .expect("decode failure");
1154
1155 assert_eq!(res1.states.len(), 0);
1156 }
1157
1158 #[tokio::test]
1159 async fn test_decode_component_bad_id() {
1160 let decoder = setup_decoder(true).await;
1161 let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1162
1163 match decoder.decode(&msg).await {
1164 Err(StreamDecodeError::Fatal(msg)) => {
1165 assert_eq!(msg, "Component id mismatch");
1166 }
1167 Ok(_) => {
1168 panic!("Expected failures to be raised")
1169 }
1170 }
1171 }
1172
1173 #[rstest]
1174 #[case(true)]
1175 #[case(false)]
1176 #[tokio::test]
1177 async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1178 let mut decoder = setup_decoder(true).await;
1179 decoder.skip_state_decode_failures = skip_failures;
1180
1181 let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1182 match decoder.decode(&msg).await {
1183 Err(StreamDecodeError::Fatal(msg)) => {
1184 if !skip_failures {
1185 assert_eq!(msg, "Missing attributes reserve0");
1186 } else {
1187 panic!("Expected failures to be ignored. Err: {msg}")
1188 }
1189 }
1190 Ok(res) => {
1191 if !skip_failures {
1192 panic!("Expected failures to be raised")
1193 } else {
1194 assert_eq!(res.states.len(), 0);
1195 }
1196 }
1197 }
1198 }
1199
1200 #[tokio::test]
1201 async fn test_decode_updates_state_on_contract_change() {
1202 let decoder = setup_decoder(true).await;
1203
1204 let mut mock_state = MockProtocolSim::new();
1206
1207 mock_state
1208 .expect_clone_box()
1209 .times(1)
1210 .returning(|| {
1211 let mut cloned_mock_state = MockProtocolSim::new();
1212 cloned_mock_state
1214 .expect_delta_transition()
1215 .times(1)
1216 .returning(|_, _, _| Ok(()));
1217 cloned_mock_state
1218 .expect_clone_box()
1219 .times(1)
1220 .returning(|| Box::new(MockProtocolSim::new()));
1221 Box::new(cloned_mock_state)
1222 });
1223
1224 let pool_id =
1226 "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1227 decoder
1228 .state
1229 .write()
1230 .await
1231 .states
1232 .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1233 decoder
1234 .state
1235 .write()
1236 .await
1237 .contracts_map
1238 .insert(
1239 Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1240 HashSet::from([pool_id.clone()]),
1241 );
1242
1243 let msg = load_test_msg("balancer_v2_delta");
1245
1246 let _ = decoder
1248 .decode(&msg)
1249 .await
1250 .expect("decode failure");
1251
1252 }
1254
1255 #[test]
1256 fn test_generate_proxy_token_address() {
1257 let idx = 1;
1258 let generated_address =
1259 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1260 assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1261
1262 let idx = 123456;
1263 let generated_address =
1264 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1265 assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1266 }
1267
1268 #[tokio::test(flavor = "multi_thread")]
1269 async fn test_euler_hook_low_pool_manager_balance() {
1270 let mut decoder = TychoStreamDecoder::new();
1271
1272 decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1273 "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1274 );
1275
1276 let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1277 let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1278 let tokens = HashMap::from([
1279 (
1280 weth.clone(),
1281 Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1282 ),
1283 (
1284 teth.clone(),
1285 Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1286 ),
1287 ]);
1288
1289 decoder.set_tokens(tokens.clone()).await;
1290
1291 let msg = load_test_msg("euler_hook_snapshot");
1292 let res = decoder
1293 .decode(&msg)
1294 .await
1295 .expect("decode failure");
1296
1297 let pool_state = res
1298 .states
1299 .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1300 .expect("Couldn't find target pool");
1301 let amount_out = pool_state
1302 .get_amount_out(
1303 BigUint::from_str("1000000000000000000").unwrap(),
1304 tokens.get(&teth).unwrap(),
1305 tokens.get(&weth).unwrap(),
1306 )
1307 .expect("Get amount out failed");
1308
1309 assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1310 }
1311}