1use std::{
2 collections::{hash_map::Entry, HashMap, HashSet},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14 dto::{ChangeType, ProtocolStateDelta},
15 models::{token::Token, Chain},
16 simulation::protocol_sim::{Balances, ProtocolSim},
17 Bytes,
18};
19#[cfg(test)]
20use {
21 mockall::mock,
22 num_bigint::BigUint,
23 std::any::Any,
24 tycho_common::simulation::{
25 errors::{SimulationError, TransitionError},
26 protocol_sim::GetAmountOutResult,
27 },
28};
29
30use crate::{
31 evm::{
32 engine_db::{update_engine, SHARED_TYCHO_DB},
33 protocol::{
34 utils::bytes_to_address,
35 vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36 },
37 tycho_models::{AccountUpdate, ResponseAccount},
38 },
39 protocol::{
40 errors::InvalidSnapshotError,
41 models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42 },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47 #[error("{0}")]
48 Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53 tokens: HashMap<Bytes, Token>,
54 states: HashMap<String, Box<dyn ProtocolSim>>,
55 components: HashMap<String, ProtocolComponent>,
56 contracts_map: HashMap<Bytes, HashSet<String>>,
58 proxy_token_addresses: HashMap<Address, Address>,
60 failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67 Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70 + Send
71 + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74pub struct TychoStreamDecoder<H>
87where
88 H: HeaderLike,
89{
90 state: Arc<RwLock<DecoderState>>,
91 skip_state_decode_failures: bool,
92 min_token_quality: u32,
93 registry: HashMap<String, Box<RegistryFn<H>>>,
94 inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110 pub fn new() -> Self {
111 Self {
112 state: Arc::new(RwLock::new(DecoderState::default())),
113 skip_state_decode_failures: false,
114 min_token_quality: 100,
115 registry: HashMap::new(),
116 inclusion_filters: HashMap::new(),
117 }
118 }
119
120 pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125 let mut guard = self.state.write().await;
126 guard.tokens = tokens;
127 }
128
129 pub fn skip_state_decode_failures(&mut self, skip: bool) {
130 self.skip_state_decode_failures = skip;
131 }
132
133 pub fn min_token_quality(&mut self, quality: u32) {
138 self.min_token_quality = quality;
139 }
140
141 pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
154 where
155 T: ProtocolSim
156 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
157 + Send
158 + 'static,
159 {
160 let decoder = Box::new(
161 move |component: ComponentWithState,
162 header: H,
163 account_balances: AccountBalances,
164 state: Arc<RwLock<DecoderState>>| {
165 let context = context.clone();
166 Box::pin(async move {
167 let guard = state.read().await;
168 T::try_from_with_header(
169 component,
170 header,
171 &account_balances,
172 &guard.tokens,
173 &context,
174 )
175 .await
176 .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
177 }) as DecodeFut
178 },
179 );
180 self.registry
181 .insert(exchange.to_string(), decoder);
182 }
183
184 pub fn register_decoder<T>(&mut self, exchange: &str)
196 where
197 T: ProtocolSim
198 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
199 + Send
200 + 'static,
201 {
202 let context = DecoderContext::new();
203 self.register_decoder_with_context::<T>(exchange, context);
204 }
205
206 pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
222 self.inclusion_filters
223 .insert(exchange.to_string(), predicate);
224 }
225
226 pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
229 let mut updated_states = HashMap::new();
231 let mut new_pairs = HashMap::new();
232 let mut removed_pairs = HashMap::new();
233 let mut contracts_map = HashMap::new();
234 let mut msg_failed_components = HashSet::new();
235
236 let header = msg
237 .state_msgs
238 .values()
239 .next()
240 .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
241 .header
242 .clone();
243
244 let block_number_or_timestamp = header
245 .clone()
246 .block_number_or_timestamp();
247 let current_block = header.clone().block();
248
249 for (protocol, protocol_msg) in msg.state_msgs.iter() {
250 if let Some(deltas) = protocol_msg.deltas.as_ref() {
252 let mut state_guard = self.state.write().await;
253
254 let new_tokens = deltas
255 .new_tokens
256 .iter()
257 .filter(|(addr, t)| {
258 t.quality >= self.min_token_quality &&
259 !state_guard.tokens.contains_key(*addr)
260 })
261 .filter_map(|(addr, t)| {
262 t.clone()
263 .try_into()
264 .map(|token| (addr.clone(), token))
265 .inspect_err(|e| {
266 warn!("Failed decoding token {e:?} {addr:#044x}");
267 *e
268 })
269 .ok()
270 })
271 .collect::<HashMap<Bytes, Token>>();
272
273 if !new_tokens.is_empty() {
274 debug!(n = new_tokens.len(), "NewTokens");
275 state_guard.tokens.extend(new_tokens);
276 }
277 }
278
279 {
281 let mut state_guard = self.state.write().await;
282 let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
283 .removed_components
284 .iter()
285 .map(|(id, comp)| {
286 if *id != comp.id {
287 error!(
288 "Component id mismatch in removed components {id} != {}",
289 comp.id
290 );
291 return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
292 }
293
294 let tokens = comp
295 .tokens
296 .iter()
297 .flat_map(|addr| state_guard.tokens.get(addr).cloned())
298 .collect::<Vec<_>>();
299
300 if tokens.len() == comp.tokens.len() {
301 Ok(Some((
302 id.clone(),
303 ProtocolComponent::from_with_tokens(comp.clone(), tokens),
304 )))
305 } else {
306 Ok(None)
307 }
308 })
309 .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
310 )?
311 .into_iter()
312 .flatten()
313 .collect();
314
315 for (id, component) in removed_components {
317 state_guard.components.remove(&id);
318 state_guard.states.remove(&id);
319 removed_pairs.insert(id, component);
320 }
321
322 info!(
324 "Processing {} contracts from snapshots",
325 protocol_msg
326 .snapshots
327 .get_vm_storage()
328 .len()
329 );
330
331 let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
332 let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
333 for (key, value) in protocol_msg
334 .snapshots
335 .get_vm_storage()
336 .iter()
337 {
338 let account: ResponseAccount = value.clone().into();
339
340 if state_guard.tokens.contains_key(key) {
341 let original_address = account.address;
342 let (impl_addr, proxy_state) = match state_guard
351 .proxy_token_addresses
352 .get(&original_address)
353 {
354 Some(impl_addr) => {
355 let proxy_state = AccountUpdate::new(
362 original_address,
363 value.chain.into(),
364 account.slots.clone(),
365 Some(account.native_balance),
366 None,
367 ChangeType::Update,
368 );
369 (*impl_addr, proxy_state)
370 }
371 None => {
372 let impl_addr = generate_proxy_token_address(
376 state_guard.proxy_token_addresses.len() as u32,
377 )?;
378 state_guard
379 .proxy_token_addresses
380 .insert(original_address, impl_addr);
381
382 let proxy_state = create_proxy_token_account(
384 original_address,
385 Some(impl_addr),
386 &account.slots,
387 value.chain.into(),
388 Some(account.native_balance),
389 );
390
391 (impl_addr, proxy_state)
392 }
393 };
394
395 proxy_token_accounts.insert(original_address, proxy_state);
396
397 let impl_update = ResponseAccount {
399 address: impl_addr,
400 slots: HashMap::new(),
401 ..account.clone()
402 };
403 storage_by_address.insert(impl_addr, impl_update);
404 } else {
405 storage_by_address.insert(account.address, account);
407 }
408 }
409
410 let mut proxy_creates: Vec<AccountUpdate> = Vec::new();
414 let mut proxy_updates: HashMap<Address, AccountUpdate> = HashMap::new();
415 for (addr, update) in proxy_token_accounts {
416 if matches!(update.change, ChangeType::Creation) {
417 proxy_creates.push(update);
418 } else {
419 proxy_updates.insert(addr, update);
420 }
421 }
422
423 info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
424 update_engine(
425 SHARED_TYCHO_DB.clone(),
426 header.clone().block(),
427 Some(storage_by_address),
428 proxy_updates,
429 )
430 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
431
432 if !proxy_creates.is_empty() {
436 SHARED_TYCHO_DB
437 .force_update_accounts(proxy_creates)
438 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
439 }
440 info!("Engine updated");
441 drop(state_guard);
442 }
443
444 let account_balances = protocol_msg
447 .clone()
448 .snapshots
449 .get_vm_storage()
450 .iter()
451 .filter_map(|(addr, acc)| {
452 let balances = acc.token_balances.clone();
453 if balances.is_empty() {
454 return None;
455 }
456 Some((addr.clone(), balances))
457 })
458 .collect::<AccountBalances>();
459
460 let mut new_components = HashMap::new();
461 let mut count_token_skips = 0;
462 let mut components_to_store = HashMap::new();
463 {
464 let state_guard = self.state.read().await;
465
466 'snapshot_loop: for (id, snapshot) in protocol_msg
468 .snapshots
469 .get_states()
470 .clone()
471 {
472 if self
474 .inclusion_filters
475 .get(protocol.as_str())
476 .is_some_and(|predicate| !predicate(&snapshot))
477 {
478 continue;
479 }
480
481 let mut component_tokens = Vec::new();
483 let mut new_tokens_accounts = HashMap::new();
484 for token in snapshot.component.tokens.clone() {
485 match state_guard.tokens.get(&token) {
486 Some(token) => {
487 component_tokens.push(token.clone());
488
489 let token_address = match bytes_to_address(&token.address) {
492 Ok(addr) => addr,
493 Err(_) => {
494 count_token_skips += 1;
495 msg_failed_components.insert(id.clone());
496 warn!(
497 "Token address could not be decoded {}, ignoring pool {:x?}",
498 token.address, id
499 );
500 continue 'snapshot_loop;
501 }
502 };
503 if !state_guard
505 .proxy_token_addresses
506 .contains_key(&token_address)
507 {
508 new_tokens_accounts.insert(
509 token_address,
510 create_proxy_token_account(
511 token_address,
512 None,
513 &HashMap::new(),
514 snapshot.component.chain.into(),
515 None,
516 ),
517 );
518 }
519 }
520 None => {
521 count_token_skips += 1;
522 msg_failed_components.insert(id.clone());
523 debug!("Token not found {}, ignoring pool {:x?}", token, id);
524 continue 'snapshot_loop;
525 }
526 }
527 }
528 let component = ProtocolComponent::from_with_tokens(
529 snapshot.component.clone(),
530 component_tokens,
531 );
532
533 if !new_tokens_accounts.is_empty() {
535 update_engine(
536 SHARED_TYCHO_DB.clone(),
537 header.clone().block(),
538 None,
539 new_tokens_accounts,
540 )
541 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
542 }
543
544 if !component
547 .static_attributes
548 .contains_key("manual_updates")
549 {
550 for contract in &component.contract_ids {
551 contracts_map
552 .entry(contract.clone())
553 .or_insert_with(HashSet::new)
554 .insert(id.clone());
555 }
556 for (_, tracing) in snapshot.entrypoints.iter() {
559 for contract in tracing.accessed_slots.keys().cloned() {
560 contracts_map
561 .entry(contract)
562 .or_insert_with(HashSet::new)
563 .insert(id.clone());
564 }
565 }
566 }
567
568 new_pairs.insert(id.clone(), component.clone());
570
571 components_to_store.insert(id.clone(), component);
573
574 if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
576 match state_decode_f(
577 snapshot,
578 header.clone(),
579 account_balances.clone(),
580 self.state.clone(),
581 )
582 .await
583 {
584 Ok(state) => {
585 new_components.insert(id.clone(), state);
586 }
587 Err(e) => {
588 if self.skip_state_decode_failures {
589 warn!(pool = id, error = %e, "StateDecodingFailure");
590 msg_failed_components.insert(id.clone());
591 continue 'snapshot_loop;
592 } else {
593 error!(pool = id, error = %e, "StateDecodingFailure");
594 return Err(StreamDecodeError::Fatal(format!("{e}")));
595 }
596 }
597 }
598 } else if self.skip_state_decode_failures {
599 warn!(pool = id, "MissingDecoderRegistration");
600 msg_failed_components.insert(id.clone());
601 continue 'snapshot_loop;
602 } else {
603 error!(pool = id, "MissingDecoderRegistration");
604 return Err(StreamDecodeError::Fatal(format!(
605 "Missing decoder registration for: {id}"
606 )));
607 }
608 }
609 }
610
611 if !components_to_store.is_empty() {
613 let mut state_guard = self.state.write().await;
614 for (id, component) in components_to_store {
615 state_guard
616 .components
617 .insert(id, component);
618 }
619 }
620
621 if !protocol_msg.snapshots.states.is_empty() {
622 info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
623 }
624 if count_token_skips > 0 {
625 info!("Skipped {count_token_skips} pools due to missing tokens");
626 }
627
628 updated_states.extend(new_components);
630
631 if let Some(deltas) = protocol_msg.deltas.clone() {
633 let mut state_guard = self.state.write().await;
635
636 let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
637 let mut new_proxy_accounts: Vec<AccountUpdate> = Vec::new();
639 for (key, value) in deltas.account_updates.iter() {
640 let mut update: AccountUpdate = value.clone().into();
641
642 if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
648 error!(
649 update = ?update,
650 "FaultyCreationDelta"
651 );
652 update.change = ChangeType::Update;
653 }
654
655 if state_guard.tokens.contains_key(key) {
656 let original_address = update.address;
657 let impl_addr = match state_guard
664 .proxy_token_addresses
665 .get(&original_address)
666 {
667 Some(impl_addr) => {
668 let proxy_update = AccountUpdate { code: None, ..update.clone() };
672 account_update_by_address.insert(original_address, proxy_update);
673
674 *impl_addr
675 }
676 None => {
677 let impl_addr = generate_proxy_token_address(
682 state_guard.proxy_token_addresses.len() as u32,
683 )?;
684 state_guard
685 .proxy_token_addresses
686 .insert(original_address, impl_addr);
687
688 let proxy_state = create_proxy_token_account(
693 original_address,
694 Some(impl_addr),
695 &update.slots,
696 update.chain,
697 update.balance,
698 );
699 new_proxy_accounts.push(proxy_state);
700
701 impl_addr
702 }
703 };
704
705 if update.code.is_some() {
707 let impl_update = AccountUpdate {
708 address: impl_addr,
709 slots: HashMap::new(),
710 ..update.clone()
711 };
712 account_update_by_address.insert(impl_addr, impl_update);
713 }
714 } else {
715 account_update_by_address.insert(update.address, update);
717 }
718 }
719 drop(state_guard);
720
721 let state_guard = self.state.read().await;
722 info!("Updating engine with {} contract deltas", deltas.account_updates.len());
723 update_engine(
724 SHARED_TYCHO_DB.clone(),
725 header.clone().block(),
726 None,
727 account_update_by_address,
728 )
729 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
730
731 if !new_proxy_accounts.is_empty() {
734 SHARED_TYCHO_DB
735 .force_update_accounts(new_proxy_accounts)
736 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
737 }
738 info!("Engine updated");
739
740 let mut pools_to_update = HashSet::new();
742 for (account, _update) in deltas.account_updates {
743 pools_to_update.extend(
745 contracts_map
746 .get(&account)
747 .cloned()
748 .unwrap_or_default(),
749 );
750 pools_to_update.extend(
752 state_guard
753 .contracts_map
754 .get(&account)
755 .cloned()
756 .unwrap_or_default(),
757 );
758 }
759
760 let all_balances = Balances {
762 component_balances: deltas
763 .component_balances
764 .iter()
765 .map(|(pool_id, bals)| {
766 let mut balances = HashMap::new();
767 for (t, b) in &bals.0 {
768 balances.insert(t.clone(), b.balance.clone());
769 }
770 pools_to_update.insert(pool_id.clone());
771 (pool_id.clone(), balances)
772 })
773 .collect(),
774 account_balances: deltas
775 .account_balances
776 .iter()
777 .map(|(account, bals)| {
778 let mut balances = HashMap::new();
779 for (t, b) in bals {
780 balances.insert(t.clone(), b.balance.clone());
781 }
782 pools_to_update.extend(
783 contracts_map
784 .get(account)
785 .cloned()
786 .unwrap_or_default(),
787 );
788 (account.clone(), balances)
789 })
790 .collect(),
791 };
792
793 for (id, update) in deltas.state_updates {
795 let update_with_block =
797 Self::add_block_info_to_delta(update, current_block.clone());
798 match Self::apply_update(
799 &id,
800 update_with_block,
801 &mut updated_states,
802 &state_guard,
803 &all_balances,
804 ) {
805 Ok(_) => {
806 pools_to_update.remove(&id);
807 }
808 Err(e) => {
809 if self.skip_state_decode_failures {
810 warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
811 updated_states.remove(&id);
813 if let Some(component) = new_pairs.remove(&id) {
815 removed_pairs.insert(id.clone(), component);
816 } else if let Some(component) = state_guard.components.get(&id) {
817 removed_pairs.insert(id.clone(), component.clone());
818 } else {
819 warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
822 }
823 pools_to_update.remove(&id);
824
825 msg_failed_components.insert(id.clone());
827 } else {
828 return Err(e);
829 }
830 }
831 }
832 }
833
834 for pool in pools_to_update {
836 let default_delta_with_block = Self::add_block_info_to_delta(
838 ProtocolStateDelta::default(),
839 current_block.clone(),
840 );
841 match Self::apply_update(
842 &pool,
843 default_delta_with_block,
844 &mut updated_states,
845 &state_guard,
846 &all_balances,
847 ) {
848 Ok(_) => {}
849 Err(e) => {
850 if self.skip_state_decode_failures {
851 warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
852 updated_states.remove(&pool);
854 if let Some(component) = new_pairs.remove(&pool) {
856 removed_pairs.insert(pool.clone(), component);
857 } else if let Some(component) = state_guard.components.get(&pool) {
858 removed_pairs.insert(pool.clone(), component.clone());
859 } else {
860 warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
863 }
864
865 msg_failed_components.insert(pool.clone());
867 } else {
868 return Err(e);
869 }
870 }
871 }
872 }
873 };
874 }
875
876 let mut state_guard = self.state.write().await;
878
879 state_guard
881 .failed_components
882 .extend(msg_failed_components);
883
884 updated_states.retain(|id, _| {
888 !state_guard
889 .failed_components
890 .contains(id)
891 });
892 new_pairs.retain(|id, _| {
893 !state_guard
894 .failed_components
895 .contains(id)
896 });
897
898 state_guard
899 .states
900 .extend(updated_states.clone().into_iter());
901
902 for (id, component) in new_pairs.iter() {
904 state_guard
905 .components
906 .insert(id.clone(), component.clone());
907 }
908
909 for (id, _) in removed_pairs.iter() {
911 state_guard.components.remove(id);
912 }
913
914 for (key, values) in contracts_map {
915 state_guard
916 .contracts_map
917 .entry(key)
918 .or_insert_with(HashSet::new)
919 .extend(values);
920 }
921
922 Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
924 .set_removed_pairs(removed_pairs)
925 .set_sync_states(msg.sync_states.clone()))
926 }
927
928 fn add_block_info_to_delta(
930 mut delta: ProtocolStateDelta,
931 block_header_opt: Option<BlockHeader>,
932 ) -> ProtocolStateDelta {
933 if let Some(header) = block_header_opt {
934 delta.updated_attributes.insert(
937 "block_number".to_string(),
938 Bytes::from(header.number.to_be_bytes().to_vec()),
939 );
940 delta.updated_attributes.insert(
941 "block_timestamp".to_string(),
942 Bytes::from(header.timestamp.to_be_bytes().to_vec()),
943 );
944 }
945 delta
946 }
947
948 fn apply_update(
949 id: &String,
950 update: ProtocolStateDelta,
951 updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
952 state_guard: &RwLockReadGuard<'_, DecoderState>,
953 all_balances: &Balances,
954 ) -> Result<(), StreamDecodeError> {
955 match updated_states.entry(id.clone()) {
956 Entry::Occupied(mut entry) => {
957 let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
959 state
960 .delta_transition(update, &state_guard.tokens, all_balances)
961 .map_err(|e| {
962 error!(pool = id, error = ?e, "DeltaTransitionError");
963 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
964 })?;
965 }
966 Entry::Vacant(_) => {
967 match state_guard.states.get(id) {
968 Some(stored_state) => {
971 let mut state = stored_state.clone();
972 state
973 .delta_transition(update, &state_guard.tokens, all_balances)
974 .map_err(|e| {
975 error!(pool = id, error = ?e, "DeltaTransitionError");
976 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
977 })?;
978 updated_states.insert(id.clone(), state);
979 }
980 None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
981 }
982 }
983 }
984 Ok(())
985 }
986}
987
988fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
990 let padded_idx = format!("{idx:x}");
991 let padded_zeroes = "0".repeat(33 - padded_idx.len());
992 let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
993 let decoded = hex::decode(proxy_token_address).map_err(|e| {
994 StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
995 })?;
996
997 const ADDRESS_LENGTH: usize = 20;
998 if decoded.len() != ADDRESS_LENGTH {
999 return Err(StreamDecodeError::Fatal(format!(
1000 "Invalid proxy token address length: expected {}, got {}",
1001 ADDRESS_LENGTH,
1002 decoded.len(),
1003 )));
1004 }
1005
1006 Ok(Address::from_slice(&decoded))
1007}
1008
1009fn create_proxy_token_account(
1014 addr: Address,
1015 new_address: Option<Address>,
1016 storage: &HashMap<U256, U256>,
1017 chain: Chain,
1018 balance: Option<U256>,
1019) -> AccountUpdate {
1020 let mut slots = storage.clone();
1021 if let Some(new_address) = new_address {
1022 slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
1023 }
1024
1025 AccountUpdate {
1026 address: addr,
1027 chain,
1028 slots,
1029 balance,
1030 code: Some(ERC20_PROXY_BYTECODE.to_vec()),
1031 change: ChangeType::Creation,
1032 }
1033}
1034
1035#[cfg(test)]
1036mock! {
1037 #[derive(Debug)]
1038 pub ProtocolSim {
1039 pub fn fee(&self) -> f64;
1040 pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
1041 pub fn get_amount_out(
1042 &self,
1043 amount_in: BigUint,
1044 token_in: &Token,
1045 token_out: &Token,
1046 ) -> Result<GetAmountOutResult, SimulationError>;
1047 pub fn get_limits(
1048 &self,
1049 sell_token: Bytes,
1050 buy_token: Bytes,
1051 ) -> Result<(BigUint, BigUint), SimulationError>;
1052 pub fn delta_transition(
1053 &mut self,
1054 delta: ProtocolStateDelta,
1055 tokens: &HashMap<Bytes, Token>,
1056 balances: &Balances,
1057 ) -> Result<(), TransitionError>;
1058 pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1059 pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1060 }
1061}
1062
1063#[cfg(test)]
1064crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1065
1066#[cfg(test)]
1067impl ProtocolSim for MockProtocolSim {
1068 fn fee(&self) -> f64 {
1069 self.fee()
1070 }
1071
1072 fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1073 self.spot_price(base, quote)
1074 }
1075
1076 fn get_amount_out(
1077 &self,
1078 amount_in: BigUint,
1079 token_in: &Token,
1080 token_out: &Token,
1081 ) -> Result<GetAmountOutResult, SimulationError> {
1082 self.get_amount_out(amount_in, token_in, token_out)
1083 }
1084
1085 fn get_limits(
1086 &self,
1087 sell_token: Bytes,
1088 buy_token: Bytes,
1089 ) -> Result<(BigUint, BigUint), SimulationError> {
1090 self.get_limits(sell_token, buy_token)
1091 }
1092
1093 fn delta_transition(
1094 &mut self,
1095 delta: ProtocolStateDelta,
1096 tokens: &HashMap<Bytes, Token>,
1097 balances: &Balances,
1098 ) -> Result<(), TransitionError> {
1099 self.delta_transition(delta, tokens, balances)
1100 }
1101
1102 fn clone_box(&self) -> Box<dyn ProtocolSim> {
1103 self.clone_box()
1104 }
1105
1106 fn as_any(&self) -> &dyn Any {
1107 panic!("MockProtocolSim does not support as_any")
1108 }
1109
1110 fn as_any_mut(&mut self) -> &mut dyn Any {
1111 panic!("MockProtocolSim does not support as_any_mut")
1112 }
1113
1114 fn eq(&self, other: &dyn ProtocolSim) -> bool {
1115 self.eq(other)
1116 }
1117
1118 fn typetag_name(&self) -> &'static str {
1119 unreachable!()
1120 }
1121
1122 fn typetag_deserialize(&self) {
1123 unreachable!()
1124 }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use std::{fs, path::Path, str::FromStr};
1130
1131 use alloy::primitives::address;
1132 use mockall::predicate::*;
1133 use rstest::*;
1134 use tycho_client::feed::BlockHeader;
1135 use tycho_common::{models::Chain, Bytes};
1136
1137 use super::*;
1138 use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1139
1140 async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1141 let mut decoder = TychoStreamDecoder::new();
1142 decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1143 if set_tokens {
1144 let tokens = [
1145 Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1146 Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1147 ]
1148 .iter()
1149 .map(|addr| {
1150 let addr_str = format!("{addr:x}");
1151 (
1152 addr.clone(),
1153 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1154 )
1155 })
1156 .collect();
1157 decoder.set_tokens(tokens).await;
1158 }
1159 decoder
1160 }
1161
1162 fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1163 let project_root = env!("CARGO_MANIFEST_DIR");
1164 let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1165 let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1166 serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1167 }
1168
1169 #[tokio::test]
1170 async fn test_decode() {
1171 let decoder = setup_decoder(true).await;
1172
1173 let msg = load_test_msg("uniswap_v2_snapshot");
1174 let res1 = decoder
1175 .decode(&msg)
1176 .await
1177 .expect("decode failure");
1178 let msg = load_test_msg("uniswap_v2_delta");
1179 let res2 = decoder
1180 .decode(&msg)
1181 .await
1182 .expect("decode failure");
1183
1184 assert_eq!(res1.states.len(), 1);
1185 assert_eq!(res2.states.len(), 1);
1186 assert_eq!(res1.sync_states.len(), 1);
1187 assert_eq!(res2.sync_states.len(), 1);
1188 }
1189
1190 #[tokio::test]
1191 async fn test_decode_component_missing_token() {
1192 let decoder = setup_decoder(false).await;
1193 let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1194 .iter()
1195 .map(|addr| {
1196 let addr_str = format!("{addr:x}");
1197 (
1198 addr.clone(),
1199 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1200 )
1201 })
1202 .collect();
1203 decoder.set_tokens(tokens).await;
1204
1205 let msg = load_test_msg("uniswap_v2_snapshot");
1206 let res1 = decoder
1207 .decode(&msg)
1208 .await
1209 .expect("decode failure");
1210
1211 assert_eq!(res1.states.len(), 0);
1212 }
1213
1214 #[tokio::test]
1215 async fn test_decode_component_bad_id() {
1216 let decoder = setup_decoder(true).await;
1217 let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1218
1219 match decoder.decode(&msg).await {
1220 Err(StreamDecodeError::Fatal(msg)) => {
1221 assert_eq!(msg, "Component id mismatch");
1222 }
1223 Ok(_) => {
1224 panic!("Expected failures to be raised")
1225 }
1226 }
1227 }
1228
1229 #[rstest]
1230 #[case(true)]
1231 #[case(false)]
1232 #[tokio::test]
1233 async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1234 let mut decoder = setup_decoder(true).await;
1235 decoder.skip_state_decode_failures = skip_failures;
1236
1237 let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1238 match decoder.decode(&msg).await {
1239 Err(StreamDecodeError::Fatal(msg)) => {
1240 if !skip_failures {
1241 assert_eq!(msg, "Missing attributes reserve0");
1242 } else {
1243 panic!("Expected failures to be ignored. Err: {msg}")
1244 }
1245 }
1246 Ok(res) => {
1247 if !skip_failures {
1248 panic!("Expected failures to be raised")
1249 } else {
1250 assert_eq!(res.states.len(), 0);
1251 }
1252 }
1253 }
1254 }
1255
1256 #[tokio::test]
1257 async fn test_decode_updates_state_on_contract_change() {
1258 let decoder = setup_decoder(true).await;
1259
1260 let mut mock_state = MockProtocolSim::new();
1262
1263 mock_state
1264 .expect_clone_box()
1265 .times(1)
1266 .returning(|| {
1267 let mut cloned_mock_state = MockProtocolSim::new();
1268 cloned_mock_state
1270 .expect_delta_transition()
1271 .times(1)
1272 .returning(|_, _, _| Ok(()));
1273 cloned_mock_state
1274 .expect_clone_box()
1275 .times(1)
1276 .returning(|| Box::new(MockProtocolSim::new()));
1277 Box::new(cloned_mock_state)
1278 });
1279
1280 let pool_id =
1282 "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1283 decoder
1284 .state
1285 .write()
1286 .await
1287 .states
1288 .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1289 decoder
1290 .state
1291 .write()
1292 .await
1293 .contracts_map
1294 .insert(
1295 Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1296 HashSet::from([pool_id.clone()]),
1297 );
1298
1299 let msg = load_test_msg("balancer_v2_delta");
1301
1302 let _ = decoder
1304 .decode(&msg)
1305 .await
1306 .expect("decode failure");
1307
1308 }
1310
1311 #[test]
1312 fn test_generate_proxy_token_address() {
1313 let idx = 1;
1314 let generated_address =
1315 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1316 assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1317
1318 let idx = 123456;
1319 let generated_address =
1320 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1321 assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1322 }
1323
1324 #[tokio::test(flavor = "multi_thread")]
1325 async fn test_euler_hook_low_pool_manager_balance() {
1326 let mut decoder = TychoStreamDecoder::new();
1327
1328 decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1329 "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1330 );
1331
1332 let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1333 let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1334 let tokens = HashMap::from([
1335 (
1336 weth.clone(),
1337 Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1338 ),
1339 (
1340 teth.clone(),
1341 Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1342 ),
1343 ]);
1344
1345 decoder.set_tokens(tokens.clone()).await;
1346
1347 let msg = load_test_msg("euler_hook_snapshot");
1348 let res = decoder
1349 .decode(&msg)
1350 .await
1351 .expect("decode failure");
1352
1353 let pool_state = res
1354 .states
1355 .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1356 .expect("Couldn't find target pool");
1357 let amount_out = pool_state
1358 .get_amount_out(
1359 BigUint::from_str("1000000000000000000").unwrap(),
1360 tokens.get(&teth).unwrap(),
1361 tokens.get(&weth).unwrap(),
1362 )
1363 .expect("Get amount out failed");
1364
1365 assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1366 }
1367}