1use std::{
2 collections::{hash_map::Entry, HashMap, HashSet},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14 dto::{ChangeType, ProtocolStateDelta},
15 models::{token::Token, Chain},
16 simulation::protocol_sim::{Balances, ProtocolSim},
17 Bytes,
18};
19#[cfg(test)]
20use {
21 mockall::mock,
22 num_bigint::BigUint,
23 std::any::Any,
24 tycho_common::simulation::{
25 errors::{SimulationError, TransitionError},
26 protocol_sim::GetAmountOutResult,
27 },
28};
29
30use crate::{
31 evm::{
32 engine_db::{update_engine, SHARED_TYCHO_DB},
33 protocol::{
34 utils::bytes_to_address,
35 vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36 },
37 tycho_models::{AccountUpdate, ResponseAccount},
38 },
39 protocol::{
40 errors::InvalidSnapshotError,
41 models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42 },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47 #[error("{0}")]
48 Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53 tokens: HashMap<Bytes, Token>,
54 states: HashMap<String, Box<dyn ProtocolSim>>,
55 components: HashMap<String, ProtocolComponent>,
56 contracts_map: HashMap<Bytes, HashSet<String>>,
58 proxy_token_addresses: HashMap<Address, Address>,
60 failed_components: HashSet<String>,
64}
65
66type DecodeFut =
67 Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
68type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
69type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
70 + Send
71 + Sync;
72type FilterFn = fn(&ComponentWithState) -> bool;
73
74pub struct TychoStreamDecoder<H>
87where
88 H: HeaderLike,
89{
90 state: Arc<RwLock<DecoderState>>,
91 skip_state_decode_failures: bool,
92 min_token_quality: u32,
93 registry: HashMap<String, Box<RegistryFn<H>>>,
94 inclusion_filters: HashMap<String, FilterFn>,
95}
96
97impl<H> Default for TychoStreamDecoder<H>
98where
99 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
100{
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl<H> TychoStreamDecoder<H>
107where
108 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
109{
110 pub fn new() -> Self {
111 Self {
112 state: Arc::new(RwLock::new(DecoderState::default())),
113 skip_state_decode_failures: false,
114 min_token_quality: 51,
115 registry: HashMap::new(),
116 inclusion_filters: HashMap::new(),
117 }
118 }
119
120 pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
125 let mut guard = self.state.write().await;
126 guard.tokens = tokens;
127 }
128
129 pub fn skip_state_decode_failures(&mut self, skip: bool) {
130 self.skip_state_decode_failures = skip;
131 }
132
133 pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
146 where
147 T: ProtocolSim
148 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
149 + Send
150 + 'static,
151 {
152 let decoder = Box::new(
153 move |component: ComponentWithState,
154 header: H,
155 account_balances: AccountBalances,
156 state: Arc<RwLock<DecoderState>>| {
157 let context = context.clone();
158 Box::pin(async move {
159 let guard = state.read().await;
160 T::try_from_with_header(
161 component,
162 header,
163 &account_balances,
164 &guard.tokens,
165 &context,
166 )
167 .await
168 .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
169 }) as DecodeFut
170 },
171 );
172 self.registry
173 .insert(exchange.to_string(), decoder);
174 }
175
176 pub fn register_decoder<T>(&mut self, exchange: &str)
188 where
189 T: ProtocolSim
190 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
191 + Send
192 + 'static,
193 {
194 let context = DecoderContext::new();
195 self.register_decoder_with_context::<T>(exchange, context);
196 }
197
198 pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
214 self.inclusion_filters
215 .insert(exchange.to_string(), predicate);
216 }
217
218 pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
221 let mut updated_states = HashMap::new();
223 let mut new_pairs = HashMap::new();
224 let mut removed_pairs = HashMap::new();
225 let mut contracts_map = HashMap::new();
226 let mut msg_failed_components = HashSet::new();
227
228 let header = msg
229 .state_msgs
230 .values()
231 .next()
232 .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
233 .header
234 .clone();
235
236 let block_number_or_timestamp = header
237 .clone()
238 .block_number_or_timestamp();
239 let current_block = header.clone().block();
240
241 for (protocol, protocol_msg) in msg.state_msgs.iter() {
242 if let Some(deltas) = protocol_msg.deltas.as_ref() {
244 let mut state_guard = self.state.write().await;
245
246 let new_tokens = deltas
247 .new_tokens
248 .iter()
249 .filter(|(addr, t)| {
250 t.quality >= self.min_token_quality &&
251 !state_guard.tokens.contains_key(*addr)
252 })
253 .filter_map(|(addr, t)| {
254 t.clone()
255 .try_into()
256 .map(|token| (addr.clone(), token))
257 .inspect_err(|e| {
258 warn!("Failed decoding token {e:?} {addr:#044x}");
259 *e
260 })
261 .ok()
262 })
263 .collect::<HashMap<Bytes, Token>>();
264
265 if !new_tokens.is_empty() {
266 debug!(n = new_tokens.len(), "NewTokens");
267 state_guard.tokens.extend(new_tokens);
268 }
269 }
270
271 {
273 let mut state_guard = self.state.write().await;
274 let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
275 .removed_components
276 .iter()
277 .map(|(id, comp)| {
278 if *id != comp.id {
279 error!(
280 "Component id mismatch in removed components {id} != {}",
281 comp.id
282 );
283 return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
284 }
285
286 let tokens = comp
287 .tokens
288 .iter()
289 .flat_map(|addr| state_guard.tokens.get(addr).cloned())
290 .collect::<Vec<_>>();
291
292 if tokens.len() == comp.tokens.len() {
293 Ok(Some((
294 id.clone(),
295 ProtocolComponent::from_with_tokens(comp.clone(), tokens),
296 )))
297 } else {
298 Ok(None)
299 }
300 })
301 .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
302 )?
303 .into_iter()
304 .flatten()
305 .collect();
306
307 for (id, component) in removed_components {
309 state_guard.components.remove(&id);
310 state_guard.states.remove(&id);
311 removed_pairs.insert(id, component);
312 }
313
314 info!(
316 "Processing {} contracts from snapshots",
317 protocol_msg
318 .snapshots
319 .get_vm_storage()
320 .len()
321 );
322
323 let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
324 let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
325 for (key, value) in protocol_msg
326 .snapshots
327 .get_vm_storage()
328 .iter()
329 {
330 let account: ResponseAccount = value.clone().into();
331
332 if state_guard.tokens.contains_key(key) {
333 let original_address = account.address;
334 let (impl_addr, proxy_state) = match state_guard
343 .proxy_token_addresses
344 .get(&original_address)
345 {
346 Some(impl_addr) => {
347 let proxy_state = AccountUpdate::new(
354 original_address,
355 value.chain.into(),
356 account.slots.clone(),
357 Some(account.native_balance),
358 None,
359 ChangeType::Update,
360 );
361 (*impl_addr, proxy_state)
362 }
363 None => {
364 let impl_addr = generate_proxy_token_address(
368 state_guard.proxy_token_addresses.len() as u32,
369 )?;
370 state_guard
371 .proxy_token_addresses
372 .insert(original_address, impl_addr);
373
374 let proxy_state = create_proxy_token_account(
376 original_address,
377 Some(impl_addr),
378 &account.slots,
379 value.chain.into(),
380 Some(account.native_balance),
381 );
382
383 (impl_addr, proxy_state)
384 }
385 };
386
387 proxy_token_accounts.insert(original_address, proxy_state);
388
389 let impl_update = ResponseAccount {
391 address: impl_addr,
392 slots: HashMap::new(),
393 ..account.clone()
394 };
395 storage_by_address.insert(impl_addr, impl_update);
396 } else {
397 storage_by_address.insert(account.address, account);
399 }
400 }
401
402 let mut proxy_creates: Vec<AccountUpdate> = Vec::new();
406 let mut proxy_updates: HashMap<Address, AccountUpdate> = HashMap::new();
407 for (addr, update) in proxy_token_accounts {
408 if matches!(update.change, ChangeType::Creation) {
409 proxy_creates.push(update);
410 } else {
411 proxy_updates.insert(addr, update);
412 }
413 }
414
415 info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
416 update_engine(
417 SHARED_TYCHO_DB.clone(),
418 header.clone().block(),
419 Some(storage_by_address),
420 proxy_updates,
421 )
422 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
423
424 if !proxy_creates.is_empty() {
428 SHARED_TYCHO_DB
429 .force_update_accounts(proxy_creates)
430 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
431 }
432 info!("Engine updated");
433 drop(state_guard);
434 }
435
436 let account_balances = protocol_msg
439 .clone()
440 .snapshots
441 .get_vm_storage()
442 .iter()
443 .filter_map(|(addr, acc)| {
444 let balances = acc.token_balances.clone();
445 if balances.is_empty() {
446 return None;
447 }
448 Some((addr.clone(), balances))
449 })
450 .collect::<AccountBalances>();
451
452 let mut new_components = HashMap::new();
453 let mut count_token_skips = 0;
454 let mut components_to_store = HashMap::new();
455 {
456 let state_guard = self.state.read().await;
457
458 'snapshot_loop: for (id, snapshot) in protocol_msg
460 .snapshots
461 .get_states()
462 .clone()
463 {
464 if self
466 .inclusion_filters
467 .get(protocol.as_str())
468 .is_some_and(|predicate| !predicate(&snapshot))
469 {
470 continue;
471 }
472
473 let mut component_tokens = Vec::new();
475 let mut new_tokens_accounts = HashMap::new();
476 for token in snapshot.component.tokens.clone() {
477 match state_guard.tokens.get(&token) {
478 Some(token) => {
479 component_tokens.push(token.clone());
480
481 let token_address = match bytes_to_address(&token.address) {
484 Ok(addr) => addr,
485 Err(_) => {
486 count_token_skips += 1;
487 msg_failed_components.insert(id.clone());
488 warn!(
489 "Token address could not be decoded {}, ignoring pool {:x?}",
490 token.address, id
491 );
492 continue 'snapshot_loop;
493 }
494 };
495 if !state_guard
497 .proxy_token_addresses
498 .contains_key(&token_address)
499 {
500 new_tokens_accounts.insert(
501 token_address,
502 create_proxy_token_account(
503 token_address,
504 None,
505 &HashMap::new(),
506 snapshot.component.chain.into(),
507 None,
508 ),
509 );
510 }
511 }
512 None => {
513 count_token_skips += 1;
514 msg_failed_components.insert(id.clone());
515 debug!("Token not found {}, ignoring pool {:x?}", token, id);
516 continue 'snapshot_loop;
517 }
518 }
519 }
520 let component = ProtocolComponent::from_with_tokens(
521 snapshot.component.clone(),
522 component_tokens,
523 );
524
525 if !new_tokens_accounts.is_empty() {
527 update_engine(
528 SHARED_TYCHO_DB.clone(),
529 header.clone().block(),
530 None,
531 new_tokens_accounts,
532 )
533 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
534 }
535
536 if !component
539 .static_attributes
540 .contains_key("manual_updates")
541 {
542 for contract in &component.contract_ids {
543 contracts_map
544 .entry(contract.clone())
545 .or_insert_with(HashSet::new)
546 .insert(id.clone());
547 }
548 for (_, tracing) in snapshot.entrypoints.iter() {
551 for contract in tracing.accessed_slots.keys().cloned() {
552 contracts_map
553 .entry(contract)
554 .or_insert_with(HashSet::new)
555 .insert(id.clone());
556 }
557 }
558 }
559
560 new_pairs.insert(id.clone(), component.clone());
562
563 components_to_store.insert(id.clone(), component);
565
566 if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
568 match state_decode_f(
569 snapshot,
570 header.clone(),
571 account_balances.clone(),
572 self.state.clone(),
573 )
574 .await
575 {
576 Ok(state) => {
577 new_components.insert(id.clone(), state);
578 }
579 Err(e) => {
580 if self.skip_state_decode_failures {
581 warn!(pool = id, error = %e, "StateDecodingFailure");
582 msg_failed_components.insert(id.clone());
583 continue 'snapshot_loop;
584 } else {
585 error!(pool = id, error = %e, "StateDecodingFailure");
586 return Err(StreamDecodeError::Fatal(format!("{e}")));
587 }
588 }
589 }
590 } else if self.skip_state_decode_failures {
591 warn!(pool = id, "MissingDecoderRegistration");
592 msg_failed_components.insert(id.clone());
593 continue 'snapshot_loop;
594 } else {
595 error!(pool = id, "MissingDecoderRegistration");
596 return Err(StreamDecodeError::Fatal(format!(
597 "Missing decoder registration for: {id}"
598 )));
599 }
600 }
601 }
602
603 if !components_to_store.is_empty() {
605 let mut state_guard = self.state.write().await;
606 for (id, component) in components_to_store {
607 state_guard
608 .components
609 .insert(id, component);
610 }
611 }
612
613 if !protocol_msg.snapshots.states.is_empty() {
614 info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
615 }
616 if count_token_skips > 0 {
617 info!("Skipped {count_token_skips} pools due to missing tokens");
618 }
619
620 updated_states.extend(new_components);
622
623 if let Some(deltas) = protocol_msg.deltas.clone() {
625 let mut state_guard = self.state.write().await;
627
628 let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
629 let mut new_proxy_accounts: Vec<AccountUpdate> = Vec::new();
631 for (key, value) in deltas.account_updates.iter() {
632 let mut update: AccountUpdate = value.clone().into();
633
634 if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
640 error!(
641 update = ?update,
642 "FaultyCreationDelta"
643 );
644 update.change = ChangeType::Update;
645 }
646
647 if state_guard.tokens.contains_key(key) {
648 let original_address = update.address;
649 let impl_addr = match state_guard
656 .proxy_token_addresses
657 .get(&original_address)
658 {
659 Some(impl_addr) => {
660 let proxy_update = AccountUpdate { code: None, ..update.clone() };
664 account_update_by_address.insert(original_address, proxy_update);
665
666 *impl_addr
667 }
668 None => {
669 let impl_addr = generate_proxy_token_address(
674 state_guard.proxy_token_addresses.len() as u32,
675 )?;
676 state_guard
677 .proxy_token_addresses
678 .insert(original_address, impl_addr);
679
680 let proxy_state = create_proxy_token_account(
685 original_address,
686 Some(impl_addr),
687 &update.slots,
688 update.chain,
689 update.balance,
690 );
691 new_proxy_accounts.push(proxy_state);
692
693 impl_addr
694 }
695 };
696
697 if update.code.is_some() {
699 let impl_update = AccountUpdate {
700 address: impl_addr,
701 slots: HashMap::new(),
702 ..update.clone()
703 };
704 account_update_by_address.insert(impl_addr, impl_update);
705 }
706 } else {
707 account_update_by_address.insert(update.address, update);
709 }
710 }
711 drop(state_guard);
712
713 let state_guard = self.state.read().await;
714 info!("Updating engine with {} contract deltas", deltas.account_updates.len());
715 update_engine(
716 SHARED_TYCHO_DB.clone(),
717 header.clone().block(),
718 None,
719 account_update_by_address,
720 )
721 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
722
723 if !new_proxy_accounts.is_empty() {
726 SHARED_TYCHO_DB
727 .force_update_accounts(new_proxy_accounts)
728 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
729 }
730 info!("Engine updated");
731
732 let mut pools_to_update = HashSet::new();
734 for (account, _update) in deltas.account_updates {
735 pools_to_update.extend(
737 contracts_map
738 .get(&account)
739 .cloned()
740 .unwrap_or_default(),
741 );
742 pools_to_update.extend(
744 state_guard
745 .contracts_map
746 .get(&account)
747 .cloned()
748 .unwrap_or_default(),
749 );
750 }
751
752 let all_balances = Balances {
754 component_balances: deltas
755 .component_balances
756 .iter()
757 .map(|(pool_id, bals)| {
758 let mut balances = HashMap::new();
759 for (t, b) in &bals.0 {
760 balances.insert(t.clone(), b.balance.clone());
761 }
762 pools_to_update.insert(pool_id.clone());
763 (pool_id.clone(), balances)
764 })
765 .collect(),
766 account_balances: deltas
767 .account_balances
768 .iter()
769 .map(|(account, bals)| {
770 let mut balances = HashMap::new();
771 for (t, b) in bals {
772 balances.insert(t.clone(), b.balance.clone());
773 }
774 pools_to_update.extend(
775 contracts_map
776 .get(account)
777 .cloned()
778 .unwrap_or_default(),
779 );
780 (account.clone(), balances)
781 })
782 .collect(),
783 };
784
785 for (id, update) in deltas.state_updates {
787 let update_with_block =
789 Self::add_block_info_to_delta(update, current_block.clone());
790 match Self::apply_update(
791 &id,
792 update_with_block,
793 &mut updated_states,
794 &state_guard,
795 &all_balances,
796 ) {
797 Ok(_) => {
798 pools_to_update.remove(&id);
799 }
800 Err(e) => {
801 if self.skip_state_decode_failures {
802 warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
803 updated_states.remove(&id);
805 if let Some(component) = new_pairs.remove(&id) {
807 removed_pairs.insert(id.clone(), component);
808 } else if let Some(component) = state_guard.components.get(&id) {
809 removed_pairs.insert(id.clone(), component.clone());
810 } else {
811 warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
814 }
815 pools_to_update.remove(&id);
816
817 msg_failed_components.insert(id.clone());
819 } else {
820 return Err(e);
821 }
822 }
823 }
824 }
825
826 for pool in pools_to_update {
828 let default_delta_with_block = Self::add_block_info_to_delta(
830 ProtocolStateDelta::default(),
831 current_block.clone(),
832 );
833 match Self::apply_update(
834 &pool,
835 default_delta_with_block,
836 &mut updated_states,
837 &state_guard,
838 &all_balances,
839 ) {
840 Ok(_) => {}
841 Err(e) => {
842 if self.skip_state_decode_failures {
843 warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
844 updated_states.remove(&pool);
846 if let Some(component) = new_pairs.remove(&pool) {
848 removed_pairs.insert(pool.clone(), component);
849 } else if let Some(component) = state_guard.components.get(&pool) {
850 removed_pairs.insert(pool.clone(), component.clone());
851 } else {
852 warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
855 }
856
857 msg_failed_components.insert(pool.clone());
859 } else {
860 return Err(e);
861 }
862 }
863 }
864 }
865 };
866 }
867
868 let mut state_guard = self.state.write().await;
870
871 state_guard
873 .failed_components
874 .extend(msg_failed_components);
875
876 updated_states.retain(|id, _| {
880 !state_guard
881 .failed_components
882 .contains(id)
883 });
884 new_pairs.retain(|id, _| {
885 !state_guard
886 .failed_components
887 .contains(id)
888 });
889
890 state_guard
891 .states
892 .extend(updated_states.clone().into_iter());
893
894 for (id, component) in new_pairs.iter() {
896 state_guard
897 .components
898 .insert(id.clone(), component.clone());
899 }
900
901 for (id, _) in removed_pairs.iter() {
903 state_guard.components.remove(id);
904 }
905
906 for (key, values) in contracts_map {
907 state_guard
908 .contracts_map
909 .entry(key)
910 .or_insert_with(HashSet::new)
911 .extend(values);
912 }
913
914 Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
916 .set_removed_pairs(removed_pairs)
917 .set_sync_states(msg.sync_states.clone()))
918 }
919
920 fn add_block_info_to_delta(
922 mut delta: ProtocolStateDelta,
923 block_header_opt: Option<BlockHeader>,
924 ) -> ProtocolStateDelta {
925 if let Some(header) = block_header_opt {
926 delta.updated_attributes.insert(
929 "block_number".to_string(),
930 Bytes::from(header.number.to_be_bytes().to_vec()),
931 );
932 delta.updated_attributes.insert(
933 "block_timestamp".to_string(),
934 Bytes::from(header.timestamp.to_be_bytes().to_vec()),
935 );
936 }
937 delta
938 }
939
940 fn apply_update(
941 id: &String,
942 update: ProtocolStateDelta,
943 updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
944 state_guard: &RwLockReadGuard<'_, DecoderState>,
945 all_balances: &Balances,
946 ) -> Result<(), StreamDecodeError> {
947 match updated_states.entry(id.clone()) {
948 Entry::Occupied(mut entry) => {
949 let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
951 state
952 .delta_transition(update, &state_guard.tokens, all_balances)
953 .map_err(|e| {
954 error!(pool = id, error = ?e, "DeltaTransitionError");
955 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
956 })?;
957 }
958 Entry::Vacant(_) => {
959 match state_guard.states.get(id) {
960 Some(stored_state) => {
963 let mut state = stored_state.clone();
964 state
965 .delta_transition(update, &state_guard.tokens, all_balances)
966 .map_err(|e| {
967 error!(pool = id, error = ?e, "DeltaTransitionError");
968 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
969 })?;
970 updated_states.insert(id.clone(), state);
971 }
972 None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
973 }
974 }
975 }
976 Ok(())
977 }
978}
979
980fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
982 let padded_idx = format!("{idx:x}");
983 let padded_zeroes = "0".repeat(33 - padded_idx.len());
984 let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
985 let decoded = hex::decode(proxy_token_address).map_err(|e| {
986 StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
987 })?;
988
989 const ADDRESS_LENGTH: usize = 20;
990 if decoded.len() != ADDRESS_LENGTH {
991 return Err(StreamDecodeError::Fatal(format!(
992 "Invalid proxy token address length: expected {}, got {}",
993 ADDRESS_LENGTH,
994 decoded.len(),
995 )));
996 }
997
998 Ok(Address::from_slice(&decoded))
999}
1000
1001fn create_proxy_token_account(
1006 addr: Address,
1007 new_address: Option<Address>,
1008 storage: &HashMap<U256, U256>,
1009 chain: Chain,
1010 balance: Option<U256>,
1011) -> AccountUpdate {
1012 let mut slots = storage.clone();
1013 if let Some(new_address) = new_address {
1014 slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
1015 }
1016
1017 AccountUpdate {
1018 address: addr,
1019 chain,
1020 slots,
1021 balance,
1022 code: Some(ERC20_PROXY_BYTECODE.to_vec()),
1023 change: ChangeType::Creation,
1024 }
1025}
1026
1027#[cfg(test)]
1028mock! {
1029 #[derive(Debug)]
1030 pub ProtocolSim {
1031 pub fn fee(&self) -> f64;
1032 pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
1033 pub fn get_amount_out(
1034 &self,
1035 amount_in: BigUint,
1036 token_in: &Token,
1037 token_out: &Token,
1038 ) -> Result<GetAmountOutResult, SimulationError>;
1039 pub fn get_limits(
1040 &self,
1041 sell_token: Bytes,
1042 buy_token: Bytes,
1043 ) -> Result<(BigUint, BigUint), SimulationError>;
1044 pub fn delta_transition(
1045 &mut self,
1046 delta: ProtocolStateDelta,
1047 tokens: &HashMap<Bytes, Token>,
1048 balances: &Balances,
1049 ) -> Result<(), TransitionError>;
1050 pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1051 pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1052 }
1053}
1054
1055#[cfg(test)]
1056crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1057
1058#[cfg(test)]
1059impl ProtocolSim for MockProtocolSim {
1060 fn fee(&self) -> f64 {
1061 self.fee()
1062 }
1063
1064 fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1065 self.spot_price(base, quote)
1066 }
1067
1068 fn get_amount_out(
1069 &self,
1070 amount_in: BigUint,
1071 token_in: &Token,
1072 token_out: &Token,
1073 ) -> Result<GetAmountOutResult, SimulationError> {
1074 self.get_amount_out(amount_in, token_in, token_out)
1075 }
1076
1077 fn get_limits(
1078 &self,
1079 sell_token: Bytes,
1080 buy_token: Bytes,
1081 ) -> Result<(BigUint, BigUint), SimulationError> {
1082 self.get_limits(sell_token, buy_token)
1083 }
1084
1085 fn delta_transition(
1086 &mut self,
1087 delta: ProtocolStateDelta,
1088 tokens: &HashMap<Bytes, Token>,
1089 balances: &Balances,
1090 ) -> Result<(), TransitionError> {
1091 self.delta_transition(delta, tokens, balances)
1092 }
1093
1094 fn clone_box(&self) -> Box<dyn ProtocolSim> {
1095 self.clone_box()
1096 }
1097
1098 fn as_any(&self) -> &dyn Any {
1099 panic!("MockProtocolSim does not support as_any")
1100 }
1101
1102 fn as_any_mut(&mut self) -> &mut dyn Any {
1103 panic!("MockProtocolSim does not support as_any_mut")
1104 }
1105
1106 fn eq(&self, other: &dyn ProtocolSim) -> bool {
1107 self.eq(other)
1108 }
1109
1110 fn typetag_name(&self) -> &'static str {
1111 unreachable!()
1112 }
1113
1114 fn typetag_deserialize(&self) {
1115 unreachable!()
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use std::{fs, path::Path, str::FromStr};
1122
1123 use alloy::primitives::address;
1124 use mockall::predicate::*;
1125 use rstest::*;
1126 use tycho_client::feed::BlockHeader;
1127 use tycho_common::{models::Chain, Bytes};
1128
1129 use super::*;
1130 use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1131
1132 async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1133 let mut decoder = TychoStreamDecoder::new();
1134 decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1135 if set_tokens {
1136 let tokens = [
1137 Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1138 Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1139 ]
1140 .iter()
1141 .map(|addr| {
1142 let addr_str = format!("{addr:x}");
1143 (
1144 addr.clone(),
1145 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1146 )
1147 })
1148 .collect();
1149 decoder.set_tokens(tokens).await;
1150 }
1151 decoder
1152 }
1153
1154 fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1155 let project_root = env!("CARGO_MANIFEST_DIR");
1156 let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1157 let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1158 serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!")
1159 }
1160
1161 #[tokio::test]
1162 async fn test_decode() {
1163 let decoder = setup_decoder(true).await;
1164
1165 let msg = load_test_msg("uniswap_v2_snapshot");
1166 let res1 = decoder
1167 .decode(&msg)
1168 .await
1169 .expect("decode failure");
1170 let msg = load_test_msg("uniswap_v2_delta");
1171 let res2 = decoder
1172 .decode(&msg)
1173 .await
1174 .expect("decode failure");
1175
1176 assert_eq!(res1.states.len(), 1);
1177 assert_eq!(res2.states.len(), 1);
1178 assert_eq!(res1.sync_states.len(), 1);
1179 assert_eq!(res2.sync_states.len(), 1);
1180 }
1181
1182 #[tokio::test]
1183 async fn test_decode_component_missing_token() {
1184 let decoder = setup_decoder(false).await;
1185 let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1186 .iter()
1187 .map(|addr| {
1188 let addr_str = format!("{addr:x}");
1189 (
1190 addr.clone(),
1191 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1192 )
1193 })
1194 .collect();
1195 decoder.set_tokens(tokens).await;
1196
1197 let msg = load_test_msg("uniswap_v2_snapshot");
1198 let res1 = decoder
1199 .decode(&msg)
1200 .await
1201 .expect("decode failure");
1202
1203 assert_eq!(res1.states.len(), 0);
1204 }
1205
1206 #[tokio::test]
1207 async fn test_decode_component_bad_id() {
1208 let decoder = setup_decoder(true).await;
1209 let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1210
1211 match decoder.decode(&msg).await {
1212 Err(StreamDecodeError::Fatal(msg)) => {
1213 assert_eq!(msg, "Component id mismatch");
1214 }
1215 Ok(_) => {
1216 panic!("Expected failures to be raised")
1217 }
1218 }
1219 }
1220
1221 #[rstest]
1222 #[case(true)]
1223 #[case(false)]
1224 #[tokio::test]
1225 async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1226 let mut decoder = setup_decoder(true).await;
1227 decoder.skip_state_decode_failures = skip_failures;
1228
1229 let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1230 match decoder.decode(&msg).await {
1231 Err(StreamDecodeError::Fatal(msg)) => {
1232 if !skip_failures {
1233 assert_eq!(msg, "Missing attributes reserve0");
1234 } else {
1235 panic!("Expected failures to be ignored. Err: {msg}")
1236 }
1237 }
1238 Ok(res) => {
1239 if !skip_failures {
1240 panic!("Expected failures to be raised")
1241 } else {
1242 assert_eq!(res.states.len(), 0);
1243 }
1244 }
1245 }
1246 }
1247
1248 #[tokio::test]
1249 async fn test_decode_updates_state_on_contract_change() {
1250 let decoder = setup_decoder(true).await;
1251
1252 let mut mock_state = MockProtocolSim::new();
1254
1255 mock_state
1256 .expect_clone_box()
1257 .times(1)
1258 .returning(|| {
1259 let mut cloned_mock_state = MockProtocolSim::new();
1260 cloned_mock_state
1262 .expect_delta_transition()
1263 .times(1)
1264 .returning(|_, _, _| Ok(()));
1265 cloned_mock_state
1266 .expect_clone_box()
1267 .times(1)
1268 .returning(|| Box::new(MockProtocolSim::new()));
1269 Box::new(cloned_mock_state)
1270 });
1271
1272 let pool_id =
1274 "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1275 decoder
1276 .state
1277 .write()
1278 .await
1279 .states
1280 .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1281 decoder
1282 .state
1283 .write()
1284 .await
1285 .contracts_map
1286 .insert(
1287 Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1288 HashSet::from([pool_id.clone()]),
1289 );
1290
1291 let msg = load_test_msg("balancer_v2_delta");
1293
1294 let _ = decoder
1296 .decode(&msg)
1297 .await
1298 .expect("decode failure");
1299
1300 }
1302
1303 #[test]
1304 fn test_generate_proxy_token_address() {
1305 let idx = 1;
1306 let generated_address =
1307 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1308 assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1309
1310 let idx = 123456;
1311 let generated_address =
1312 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1313 assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1314 }
1315
1316 #[tokio::test(flavor = "multi_thread")]
1317 async fn test_euler_hook_low_pool_manager_balance() {
1318 let mut decoder = TychoStreamDecoder::new();
1319
1320 decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1321 "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1322 );
1323
1324 let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1325 let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1326 let tokens = HashMap::from([
1327 (
1328 weth.clone(),
1329 Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1330 ),
1331 (
1332 teth.clone(),
1333 Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1334 ),
1335 ]);
1336
1337 decoder.set_tokens(tokens.clone()).await;
1338
1339 let msg = load_test_msg("euler_hook_snapshot");
1340 let res = decoder
1341 .decode(&msg)
1342 .await
1343 .expect("decode failure");
1344
1345 let pool_state = res
1346 .states
1347 .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1348 .expect("Couldn't find target pool");
1349 let amount_out = pool_state
1350 .get_amount_out(
1351 BigUint::from_str("1000000000000000000").unwrap(),
1352 tokens.get(&teth).unwrap(),
1353 tokens.get(&weth).unwrap(),
1354 )
1355 .expect("Get amount out failed");
1356
1357 assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1358 }
1359}