1use std::{
2 collections::{hash_map::Entry, HashMap, HashSet},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use alloy::primitives::{Address, U256};
9use thiserror::Error;
10use tokio::sync::{RwLock, RwLockReadGuard};
11use tracing::{debug, error, info, warn};
12use tycho_client::feed::{synchronizer::ComponentWithState, BlockHeader, FeedMessage, HeaderLike};
13use tycho_common::{
14 dto::{ChangeType, ProtocolStateDelta},
15 models::{blockchain::BlockAggregatedChanges, token::Token, Chain},
16 simulation::protocol_sim::{Balances, ProtocolSim},
17 Bytes,
18};
19#[cfg(test)]
20use {
21 mockall::mock,
22 num_bigint::BigUint,
23 std::any::Any,
24 tycho_common::simulation::{
25 errors::{SimulationError, TransitionError},
26 protocol_sim::GetAmountOutResult,
27 },
28};
29
30use crate::{
31 evm::{
32 engine_db::{update_engine, SHARED_TYCHO_DB},
33 protocol::{
34 utils::bytes_to_address,
35 vm::{constants::ERC20_PROXY_BYTECODE, erc20_token::IMPLEMENTATION_SLOT},
36 },
37 tycho_models::{AccountUpdate, ResponseAccount},
38 },
39 protocol::{
40 errors::InvalidSnapshotError,
41 models::{DecoderContext, ProtocolComponent, TryFromWithBlock, Update},
42 },
43};
44
45#[derive(Error, Debug)]
46pub enum StreamDecodeError {
47 #[error("{0}")]
48 Fatal(String),
49}
50
51#[derive(Default)]
52struct DecoderState {
53 tokens: HashMap<Bytes, Token>,
54 states: HashMap<String, Box<dyn ProtocolSim>>,
55 components: HashMap<String, ProtocolComponent>,
56 contracts_map: HashMap<Bytes, HashSet<String>>,
58 proxy_token_addresses: HashMap<Address, Address>,
60 failed_components: HashSet<String>,
64 current_block_number: u64,
66}
67
68type DecodeFut =
69 Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
70type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
71type RegistryFn<H> = dyn Fn(ComponentWithState, H, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
72 + Send
73 + Sync;
74type FilterFn = fn(&ComponentWithState) -> bool;
75
76pub struct TychoStreamDecoder<H>
89where
90 H: HeaderLike,
91{
92 state: Arc<RwLock<DecoderState>>,
93 skip_state_decode_failures: bool,
94 min_token_quality: u32,
95 registry: HashMap<String, Box<RegistryFn<H>>>,
96 inclusion_filters: HashMap<String, FilterFn>,
97}
98
99impl<H> Default for TychoStreamDecoder<H>
100where
101 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
102{
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108impl<H> TychoStreamDecoder<H>
109where
110 H: HeaderLike + Clone + Sync + Send + 'static + std::fmt::Debug,
111{
112 pub fn new() -> Self {
113 Self {
114 state: Arc::new(RwLock::new(DecoderState::default())),
115 skip_state_decode_failures: false,
116 min_token_quality: 100,
117 registry: HashMap::new(),
118 inclusion_filters: HashMap::new(),
119 }
120 }
121
122 pub async fn set_tokens(&self, tokens: HashMap<Bytes, Token>) {
127 let mut guard = self.state.write().await;
128 guard.tokens = tokens;
129 }
130
131 pub fn skip_state_decode_failures(&mut self, skip: bool) {
132 self.skip_state_decode_failures = skip;
133 }
134
135 pub fn min_token_quality(&mut self, quality: u32) {
141 self.min_token_quality = quality;
142 }
143
144 pub fn register_decoder_with_context<T>(&mut self, exchange: &str, context: DecoderContext)
157 where
158 T: ProtocolSim
159 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
160 + Send
161 + 'static,
162 {
163 let decoder = Box::new(
164 move |component: ComponentWithState,
165 header: H,
166 account_balances: AccountBalances,
167 state: Arc<RwLock<DecoderState>>| {
168 let context = context.clone();
169 Box::pin(async move {
170 let guard = state.read().await;
171 T::try_from_with_header(
172 component,
173 header,
174 &account_balances,
175 &guard.tokens,
176 &context,
177 )
178 .await
179 .map(|c| Box::new(c) as Box<dyn ProtocolSim>)
180 }) as DecodeFut
181 },
182 );
183 self.registry
184 .insert(exchange.to_string(), decoder);
185 }
186
187 pub fn register_decoder<T>(&mut self, exchange: &str)
199 where
200 T: ProtocolSim
201 + TryFromWithBlock<ComponentWithState, H, Error = InvalidSnapshotError>
202 + Send
203 + 'static,
204 {
205 let context = DecoderContext::new();
206 self.register_decoder_with_context::<T>(exchange, context);
207 }
208
209 pub fn register_filter(&mut self, exchange: &str, predicate: FilterFn) {
225 self.inclusion_filters
226 .insert(exchange.to_string(), predicate);
227 }
228
229 pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
232 let mut updated_states = HashMap::new();
234 let mut new_pairs = HashMap::new();
235 let mut removed_pairs = HashMap::new();
236 let mut contracts_map = HashMap::new();
237 let mut msg_failed_components = HashSet::new();
238
239 let header = msg
240 .state_msgs
241 .values()
242 .next()
243 .ok_or_else(|| StreamDecodeError::Fatal("Missing block!".into()))?
244 .header
245 .clone();
246
247 let block_number_or_timestamp = header
248 .clone()
249 .block_number_or_timestamp();
250 let current_block = header.clone().block();
251 let is_partial = current_block
252 .as_ref()
253 .map(|h| h.partial_block_index.is_some())
254 .unwrap_or(false);
255
256 for (protocol, protocol_msg) in msg.state_msgs.iter() {
257 if let Some(deltas) = protocol_msg.deltas.as_ref() {
259 let mut state_guard = self.state.write().await;
260
261 let new_tokens = deltas
262 .new_tokens
263 .iter()
264 .filter(|(addr, t)| {
265 t.quality >= self.min_token_quality &&
266 !state_guard.tokens.contains_key(*addr)
267 })
268 .map(|(addr, t)| (addr.clone(), t.clone()))
269 .collect::<HashMap<Bytes, Token>>();
270
271 if !new_tokens.is_empty() {
272 debug!(n = new_tokens.len(), "NewTokens");
273 state_guard.tokens.extend(new_tokens);
274 }
275 }
276
277 {
279 let mut state_guard = self.state.write().await;
280 let removed_components: Vec<(String, ProtocolComponent)> = protocol_msg
281 .removed_components
282 .iter()
283 .map(|(id, comp)| {
284 if *id != comp.id {
285 error!(
286 "Component id mismatch in removed components {id} != {}",
287 comp.id
288 );
289 return Err(StreamDecodeError::Fatal("Component id mismatch".into()));
290 }
291
292 let tokens = comp
293 .tokens
294 .iter()
295 .flat_map(|addr| state_guard.tokens.get(addr).cloned())
296 .collect::<Vec<_>>();
297
298 if tokens.len() == comp.tokens.len() {
299 Ok(Some((
300 id.clone(),
301 ProtocolComponent::from_with_tokens(comp.clone(), tokens),
302 )))
303 } else {
304 Ok(None)
305 }
306 })
307 .collect::<Result<Vec<Option<(String, ProtocolComponent)>>, StreamDecodeError>>(
308 )?
309 .into_iter()
310 .flatten()
311 .collect();
312
313 for (id, component) in removed_components {
315 state_guard.components.remove(&id);
316 state_guard.states.remove(&id);
317 removed_pairs.insert(id, component);
318 }
319
320 info!(
322 "Processing {} contracts from snapshots",
323 protocol_msg
324 .snapshots
325 .get_vm_storage()
326 .len()
327 );
328
329 let mut proxy_token_accounts: HashMap<Address, AccountUpdate> = HashMap::new();
330 let mut storage_by_address: HashMap<Address, ResponseAccount> = HashMap::new();
331 for (key, value) in protocol_msg
332 .snapshots
333 .get_vm_storage()
334 .iter()
335 {
336 let account: ResponseAccount = value.clone().into();
337
338 if state_guard.tokens.contains_key(key) {
339 let original_address = account.address;
340 let (impl_addr, proxy_state) = match state_guard
349 .proxy_token_addresses
350 .get(&original_address)
351 {
352 Some(impl_addr) => {
353 let proxy_state = AccountUpdate::new(
360 original_address,
361 value.chain,
362 account.slots.clone(),
363 Some(account.native_balance),
364 None,
365 ChangeType::Update,
366 );
367 (*impl_addr, proxy_state)
368 }
369 None => {
370 let impl_addr = generate_proxy_token_address(
374 state_guard.proxy_token_addresses.len() as u32,
375 )?;
376 state_guard
377 .proxy_token_addresses
378 .insert(original_address, impl_addr);
379
380 let proxy_state = create_proxy_token_account(
382 original_address,
383 Some(impl_addr),
384 &account.slots,
385 value.chain,
386 Some(account.native_balance),
387 );
388
389 (impl_addr, proxy_state)
390 }
391 };
392
393 proxy_token_accounts.insert(original_address, proxy_state);
394
395 let impl_update = ResponseAccount {
397 address: impl_addr,
398 slots: HashMap::new(),
399 ..account.clone()
400 };
401 storage_by_address.insert(impl_addr, impl_update);
402 } else {
403 storage_by_address.insert(account.address, account);
405 }
406 }
407
408 let mut proxy_creates: Vec<AccountUpdate> = Vec::new();
412 let mut proxy_updates: HashMap<Address, AccountUpdate> = HashMap::new();
413 for (addr, update) in proxy_token_accounts {
414 if matches!(update.change, ChangeType::Creation) {
415 proxy_creates.push(update);
416 } else {
417 proxy_updates.insert(addr, update);
418 }
419 }
420
421 info!("Updating engine with {} contracts from snapshots", storage_by_address.len());
422 update_engine(
423 SHARED_TYCHO_DB.clone(),
424 header.clone().block(),
425 Some(storage_by_address),
426 proxy_updates,
427 )
428 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
429
430 if !proxy_creates.is_empty() {
434 SHARED_TYCHO_DB
435 .force_update_accounts(proxy_creates)
436 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
437 }
438 info!("Engine updated");
439 drop(state_guard);
440 }
441
442 let account_balances = protocol_msg
445 .clone()
446 .snapshots
447 .get_vm_storage()
448 .iter()
449 .filter_map(|(addr, acc)| {
450 if acc.token_balances.is_empty() {
451 return None;
452 }
453 let balances = acc
454 .token_balances
455 .iter()
456 .map(|(token_addr, ab)| (token_addr.clone(), ab.balance.clone()))
457 .collect::<HashMap<Bytes, Bytes>>();
458 Some((addr.clone(), balances))
459 })
460 .collect::<AccountBalances>();
461
462 let mut new_components = HashMap::new();
463 let mut count_token_skips = 0;
464 let mut components_to_store = HashMap::new();
465 {
466 let state_guard = self.state.read().await;
467
468 'snapshot_loop: for (id, snapshot) in protocol_msg
470 .snapshots
471 .get_states()
472 .clone()
473 {
474 if self
476 .inclusion_filters
477 .get(protocol.as_str())
478 .is_some_and(|predicate| !predicate(&snapshot))
479 {
480 continue;
481 }
482
483 let mut component_tokens = Vec::new();
485 let mut new_tokens_accounts = HashMap::new();
486 for token in snapshot.component.tokens.clone() {
487 match state_guard.tokens.get(&token) {
488 Some(token) => {
489 component_tokens.push(token.clone());
490
491 let token_address = match bytes_to_address(&token.address) {
494 Ok(addr) => addr,
495 Err(_) => {
496 count_token_skips += 1;
497 msg_failed_components.insert(id.clone());
498 warn!(
499 "Token address could not be decoded {}, ignoring pool {:x?}",
500 token.address, id
501 );
502 continue 'snapshot_loop;
503 }
504 };
505 if !state_guard
507 .proxy_token_addresses
508 .contains_key(&token_address)
509 {
510 new_tokens_accounts.insert(
511 token_address,
512 create_proxy_token_account(
513 token_address,
514 None,
515 &HashMap::new(),
516 snapshot.component.chain,
517 None,
518 ),
519 );
520 }
521 }
522 None => {
523 count_token_skips += 1;
524 msg_failed_components.insert(id.clone());
525 debug!("Token not found {}, ignoring pool {:x?}", token, id);
526 continue 'snapshot_loop;
527 }
528 }
529 }
530 let component = ProtocolComponent::from_with_tokens(
531 snapshot.component.clone(),
532 component_tokens,
533 );
534
535 if !new_tokens_accounts.is_empty() {
537 update_engine(
538 SHARED_TYCHO_DB.clone(),
539 header.clone().block(),
540 None,
541 new_tokens_accounts,
542 )
543 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
544 }
545
546 if !component
549 .static_attributes
550 .contains_key("manual_updates")
551 {
552 for contract in &component.contract_ids {
553 contracts_map
554 .entry(contract.clone())
555 .or_insert_with(HashSet::new)
556 .insert(id.clone());
557 }
558 for (_, tracing) in snapshot.entrypoints.iter() {
561 for contract in tracing.accessed_slots.keys().cloned() {
562 contracts_map
563 .entry(contract)
564 .or_insert_with(HashSet::new)
565 .insert(id.clone());
566 }
567 }
568 }
569
570 new_pairs.insert(id.clone(), component.clone());
572
573 components_to_store.insert(id.clone(), component);
575
576 if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
578 match state_decode_f(
579 snapshot,
580 header.clone(),
581 account_balances.clone(),
582 self.state.clone(),
583 )
584 .await
585 {
586 Ok(state) => {
587 new_components.insert(id.clone(), state);
588 }
589 Err(e) => {
590 if self.skip_state_decode_failures {
591 warn!(pool = id, error = %e, "StateDecodingFailure");
592 msg_failed_components.insert(id.clone());
593 continue 'snapshot_loop;
594 } else {
595 error!(pool = id, error = %e, "StateDecodingFailure");
596 return Err(StreamDecodeError::Fatal(format!("{e}")));
597 }
598 }
599 }
600 } else if self.skip_state_decode_failures {
601 warn!(pool = id, "MissingDecoderRegistration");
602 msg_failed_components.insert(id.clone());
603 continue 'snapshot_loop;
604 } else {
605 error!(pool = id, "MissingDecoderRegistration");
606 return Err(StreamDecodeError::Fatal(format!(
607 "Missing decoder registration for: {id}"
608 )));
609 }
610 }
611 }
612
613 if !components_to_store.is_empty() {
615 let mut state_guard = self.state.write().await;
616 for (id, component) in components_to_store {
617 state_guard
618 .components
619 .insert(id, component);
620 }
621 }
622
623 if !protocol_msg.snapshots.states.is_empty() {
624 info!("Decoded {} snapshots for protocol {protocol}", new_components.len());
625 }
626 if count_token_skips > 0 {
627 info!("Skipped {count_token_skips} pools due to missing tokens");
628 }
629
630 updated_states.extend(new_components);
632
633 if let Some(deltas) = protocol_msg.deltas.clone() {
635 let mut state_guard = self.state.write().await;
637
638 let mut account_update_by_address: HashMap<Address, AccountUpdate> = HashMap::new();
639 let mut new_proxy_accounts: Vec<AccountUpdate> = Vec::new();
641 for (key, value) in deltas.account_deltas.iter() {
642 let mut update: AccountUpdate = value.clone().into();
643
644 if update.code.is_none() && matches!(update.change, ChangeType::Creation) {
650 error!(
651 update = ?update,
652 "FaultyCreationDelta"
653 );
654 update.code = Some(vec![]);
655 }
656
657 if state_guard.tokens.contains_key(key) {
658 let original_address = update.address;
659 let impl_addr = match state_guard
666 .proxy_token_addresses
667 .get(&original_address)
668 {
669 Some(impl_addr) => {
670 let proxy_update = AccountUpdate { code: None, ..update.clone() };
674 account_update_by_address.insert(original_address, proxy_update);
675
676 *impl_addr
677 }
678 None => {
679 let impl_addr = generate_proxy_token_address(
684 state_guard.proxy_token_addresses.len() as u32,
685 )?;
686 state_guard
687 .proxy_token_addresses
688 .insert(original_address, impl_addr);
689
690 let proxy_state = create_proxy_token_account(
695 original_address,
696 Some(impl_addr),
697 &update.slots,
698 update.chain,
699 update.balance,
700 );
701 new_proxy_accounts.push(proxy_state);
702
703 impl_addr
704 }
705 };
706
707 if update.code.is_some() {
709 let impl_update = AccountUpdate {
710 address: impl_addr,
711 slots: HashMap::new(),
712 ..update.clone()
713 };
714 account_update_by_address.insert(impl_addr, impl_update);
715 }
716 } else {
717 account_update_by_address.insert(update.address, update);
719 }
720 }
721 drop(state_guard);
722
723 let state_guard = self.state.read().await;
724 info!("Updating engine with {} contract deltas", deltas.account_deltas.len());
725 update_engine(
726 SHARED_TYCHO_DB.clone(),
727 header.clone().block(),
728 None,
729 account_update_by_address,
730 )
731 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
732
733 if !new_proxy_accounts.is_empty() {
736 SHARED_TYCHO_DB
737 .force_update_accounts(new_proxy_accounts)
738 .map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
739 }
740 info!("Engine updated");
741
742 let mut pools_to_update = HashSet::new();
744 for (account, _update) in deltas.account_deltas {
745 pools_to_update.extend(
747 contracts_map
748 .get(&account)
749 .cloned()
750 .unwrap_or_default(),
751 );
752 pools_to_update.extend(
754 state_guard
755 .contracts_map
756 .get(&account)
757 .cloned()
758 .unwrap_or_default(),
759 );
760 }
761
762 let all_balances = Balances {
764 component_balances: deltas
765 .component_balances
766 .iter()
767 .map(|(pool_id, bals)| {
768 let mut balances = HashMap::new();
769 for (t, b) in bals {
770 balances.insert(t.clone(), b.balance.clone());
771 }
772 pools_to_update.insert(pool_id.clone());
773 (pool_id.clone(), balances)
774 })
775 .collect(),
776 account_balances: deltas
777 .account_balances
778 .iter()
779 .map(|(account, bals)| {
780 let mut balances = HashMap::new();
781 for (t, b) in bals {
782 balances.insert(t.clone(), b.balance.clone());
783 }
784 pools_to_update.extend(
785 contracts_map
786 .get(account)
787 .cloned()
788 .unwrap_or_default(),
789 );
790 (account.clone(), balances)
791 })
792 .collect(),
793 };
794
795 for (id, update) in deltas.state_deltas {
797 let update_with_block = Self::add_block_info_to_delta(
799 ProtocolStateDelta::from(update),
800 current_block.clone(),
801 );
802 match Self::apply_update(
803 &id,
804 update_with_block,
805 &mut updated_states,
806 &state_guard,
807 &all_balances,
808 ) {
809 Ok(_) => {
810 pools_to_update.remove(&id);
811 }
812 Err(e) => {
813 if self.skip_state_decode_failures {
814 warn!(pool = id, error = %e, "Failed to apply state update, marking component as removed");
815 updated_states.remove(&id);
817 if let Some(component) = new_pairs.remove(&id) {
819 removed_pairs.insert(id.clone(), component);
820 } else if let Some(component) = state_guard.components.get(&id) {
821 removed_pairs.insert(id.clone(), component.clone());
822 } else {
823 warn!(pool = id, "Component not found in new_pairs or state, cannot add to removed_pairs");
826 }
827 pools_to_update.remove(&id);
828
829 msg_failed_components.insert(id.clone());
831 } else {
832 return Err(e);
833 }
834 }
835 }
836 }
837
838 for pool in pools_to_update {
840 let default_delta_with_block = Self::add_block_info_to_delta(
842 ProtocolStateDelta::default(),
843 current_block.clone(),
844 );
845 match Self::apply_update(
846 &pool,
847 default_delta_with_block,
848 &mut updated_states,
849 &state_guard,
850 &all_balances,
851 ) {
852 Ok(_) => {}
853 Err(e) => {
854 if self.skip_state_decode_failures {
855 warn!(pool = pool, error = %e, "Failed to apply contract/balance update, marking component as removed");
856 updated_states.remove(&pool);
858 if let Some(component) = new_pairs.remove(&pool) {
860 removed_pairs.insert(pool.clone(), component);
861 } else if let Some(component) = state_guard.components.get(&pool) {
862 removed_pairs.insert(pool.clone(), component.clone());
863 } else {
864 warn!(pool = pool, "Component not found in new_pairs or state, cannot add to removed_pairs");
867 }
868
869 msg_failed_components.insert(pool.clone());
871 } else {
872 return Err(e);
873 }
874 }
875 }
876 }
877 };
878 }
879
880 let mut state_guard = self.state.write().await;
882
883 state_guard
885 .failed_components
886 .extend(msg_failed_components);
887
888 updated_states.retain(|id, _| {
892 !state_guard
893 .failed_components
894 .contains(id)
895 });
896 new_pairs.retain(|id, _| {
897 !state_guard
898 .failed_components
899 .contains(id)
900 });
901
902 state_guard
903 .states
904 .extend(updated_states.clone());
905
906 state_guard.current_block_number = block_number_or_timestamp;
907
908 for (id, component) in new_pairs.iter() {
910 state_guard
911 .components
912 .insert(id.clone(), component.clone());
913 }
914
915 for id in removed_pairs.keys() {
917 state_guard.components.remove(id);
918 }
919
920 for (key, values) in contracts_map {
921 state_guard
922 .contracts_map
923 .entry(key)
924 .or_insert_with(HashSet::new)
925 .extend(values);
926 }
927
928 Ok(Update::new(block_number_or_timestamp, updated_states, new_pairs)
930 .set_is_partial(is_partial)
931 .set_removed_pairs(removed_pairs)
932 .set_sync_states(msg.sync_states.clone()))
933 }
934
935 pub async fn apply_deltas_ephemeral(
954 &self,
955 pending_deltas: &HashMap<String, BlockAggregatedChanges>,
956 header: H,
957 ) -> Result<Update, StreamDecodeError> {
958 let block_number_or_timestamp = header
959 .clone()
960 .block_number_or_timestamp();
961 let current_block = header.block();
962 let state_guard = self.state.read().await;
963
964 let mut updated_states: HashMap<String, Box<dyn ProtocolSim>> = HashMap::new();
965
966 for deltas in pending_deltas.values() {
967 let all_balances = Balances {
968 component_balances: deltas
969 .component_balances
970 .iter()
971 .map(|(pool_id, bals)| {
972 let balances = bals
973 .iter()
974 .map(|(t, b)| (t.clone(), b.balance.clone()))
975 .collect();
976 (pool_id.clone(), balances)
977 })
978 .collect(),
979 account_balances: HashMap::new(),
980 };
981
982 for (id, state_delta) in &deltas.state_deltas {
983 let dto_delta = Self::add_block_info_to_delta(
984 ProtocolStateDelta::from(state_delta.clone()),
985 current_block.clone(),
986 );
987 if let Err(e) = Self::apply_update(
988 id,
989 dto_delta,
990 &mut updated_states,
991 &state_guard,
992 &all_balances,
993 ) {
994 warn!(pool = id, error = %e, "EphemeralDeltaTransitionError");
995 }
996 }
997 }
998
999 Ok(Update::new(block_number_or_timestamp, updated_states, HashMap::new()))
1000 }
1001
1002 fn add_block_info_to_delta(
1004 mut delta: ProtocolStateDelta,
1005 block_header_opt: Option<BlockHeader>,
1006 ) -> ProtocolStateDelta {
1007 if let Some(header) = block_header_opt {
1008 delta.updated_attributes.insert(
1011 "block_number".to_string(),
1012 Bytes::from(header.number.to_be_bytes().to_vec()),
1013 );
1014 delta.updated_attributes.insert(
1015 "block_timestamp".to_string(),
1016 Bytes::from(header.timestamp.to_be_bytes().to_vec()),
1017 );
1018 }
1019 delta
1020 }
1021
1022 fn apply_update(
1023 id: &String,
1024 update: ProtocolStateDelta,
1025 updated_states: &mut HashMap<String, Box<dyn ProtocolSim>>,
1026 state_guard: &RwLockReadGuard<'_, DecoderState>,
1027 all_balances: &Balances,
1028 ) -> Result<(), StreamDecodeError> {
1029 match updated_states.entry(id.clone()) {
1030 Entry::Occupied(mut entry) => {
1031 let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
1033 state
1034 .delta_transition(update, &state_guard.tokens, all_balances)
1035 .map_err(|e| {
1036 error!(pool = id, error = ?e, "DeltaTransitionError");
1037 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
1038 })?;
1039 }
1040 Entry::Vacant(_) => {
1041 match state_guard.states.get(id) {
1042 Some(stored_state) => {
1045 let mut state = stored_state.clone();
1046 state
1047 .delta_transition(update, &state_guard.tokens, all_balances)
1048 .map_err(|e| {
1049 error!(pool = id, error = ?e, "DeltaTransitionError");
1050 StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
1051 })?;
1052 updated_states.insert(id.clone(), state);
1053 }
1054 None => debug!(pool = id, reason = "MissingState", "DeltaTransitionError"),
1055 }
1056 }
1057 }
1058 Ok(())
1059 }
1060}
1061
1062fn generate_proxy_token_address(idx: u32) -> Result<Address, StreamDecodeError> {
1064 let padded_idx = format!("{idx:x}");
1065 let padded_zeroes = "0".repeat(33 - padded_idx.len());
1066 let proxy_token_address = format!("{padded_zeroes}{padded_idx}BAdbaBe");
1067 let decoded = hex::decode(proxy_token_address).map_err(|e| {
1068 StreamDecodeError::Fatal(format!("Invalid proxy token address encoding: {e}"))
1069 })?;
1070
1071 const ADDRESS_LENGTH: usize = 20;
1072 if decoded.len() != ADDRESS_LENGTH {
1073 return Err(StreamDecodeError::Fatal(format!(
1074 "Invalid proxy token address length: expected {}, got {}",
1075 ADDRESS_LENGTH,
1076 decoded.len(),
1077 )));
1078 }
1079
1080 Ok(Address::from_slice(&decoded))
1081}
1082
1083fn create_proxy_token_account(
1088 addr: Address,
1089 new_address: Option<Address>,
1090 storage: &HashMap<U256, U256>,
1091 chain: Chain,
1092 balance: Option<U256>,
1093) -> AccountUpdate {
1094 let mut slots = storage.clone();
1095 if let Some(new_address) = new_address {
1096 slots.insert(*IMPLEMENTATION_SLOT, U256::from_be_slice(new_address.as_slice()));
1097 }
1098
1099 AccountUpdate {
1100 address: addr,
1101 chain,
1102 slots,
1103 balance,
1104 code: Some(ERC20_PROXY_BYTECODE.to_vec()),
1105 change: ChangeType::Creation,
1106 }
1107}
1108
1109#[cfg(test)]
1110mock! {
1111 #[derive(Debug)]
1112 pub ProtocolSim {
1113 pub fn fee(&self) -> f64;
1114 pub fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError>;
1115 pub fn get_amount_out(
1116 &self,
1117 amount_in: BigUint,
1118 token_in: &Token,
1119 token_out: &Token,
1120 ) -> Result<GetAmountOutResult, SimulationError>;
1121 pub fn get_limits(
1122 &self,
1123 sell_token: Bytes,
1124 buy_token: Bytes,
1125 ) -> Result<(BigUint, BigUint), SimulationError>;
1126 pub fn delta_transition(
1127 &mut self,
1128 delta: ProtocolStateDelta,
1129 tokens: &HashMap<Bytes, Token>,
1130 balances: &Balances,
1131 ) -> Result<(), TransitionError>;
1132 pub fn clone_box(&self) -> Box<dyn ProtocolSim>;
1133 pub fn eq(&self, other: &dyn ProtocolSim) -> bool;
1134 }
1135}
1136
1137#[cfg(test)]
1138crate::impl_non_serializable_protocol!(MockProtocolSim, "test protocol");
1139
1140#[cfg(test)]
1141impl ProtocolSim for MockProtocolSim {
1142 fn fee(&self) -> f64 {
1143 self.fee()
1144 }
1145
1146 fn spot_price(&self, base: &Token, quote: &Token) -> Result<f64, SimulationError> {
1147 self.spot_price(base, quote)
1148 }
1149
1150 fn get_amount_out(
1151 &self,
1152 amount_in: BigUint,
1153 token_in: &Token,
1154 token_out: &Token,
1155 ) -> Result<GetAmountOutResult, SimulationError> {
1156 self.get_amount_out(amount_in, token_in, token_out)
1157 }
1158
1159 fn get_limits(
1160 &self,
1161 sell_token: Bytes,
1162 buy_token: Bytes,
1163 ) -> Result<(BigUint, BigUint), SimulationError> {
1164 self.get_limits(sell_token, buy_token)
1165 }
1166
1167 fn delta_transition(
1168 &mut self,
1169 delta: ProtocolStateDelta,
1170 tokens: &HashMap<Bytes, Token>,
1171 balances: &Balances,
1172 ) -> Result<(), TransitionError> {
1173 self.delta_transition(delta, tokens, balances)
1174 }
1175
1176 fn clone_box(&self) -> Box<dyn ProtocolSim> {
1177 self.clone_box()
1178 }
1179
1180 fn as_any(&self) -> &dyn Any {
1181 panic!("MockProtocolSim does not support as_any")
1182 }
1183
1184 fn as_any_mut(&mut self) -> &mut dyn Any {
1185 panic!("MockProtocolSim does not support as_any_mut")
1186 }
1187
1188 fn eq(&self, other: &dyn ProtocolSim) -> bool {
1189 self.eq(other)
1190 }
1191
1192 fn typetag_name(&self) -> &'static str {
1193 unreachable!()
1194 }
1195
1196 fn typetag_deserialize(&self) {
1197 unreachable!()
1198 }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203 use std::str::FromStr;
1204
1205 use alloy::primitives::address;
1206 use mockall::predicate::*;
1207 use rstest::*;
1208 use tycho_client::feed::BlockHeader;
1209 use tycho_common::{models::Chain, Bytes};
1210
1211 use super::*;
1212 use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
1213
1214 async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder<BlockHeader> {
1215 let mut decoder = TychoStreamDecoder::new();
1216 decoder.register_decoder::<UniswapV2State>("uniswap_v2");
1217 if set_tokens {
1218 let tokens = [
1219 Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0),
1220 Bytes::from("0xdac17f958d2ee523a2206206994597c13d831ec7").lpad(20, 0),
1221 ]
1222 .iter()
1223 .map(|addr| {
1224 let addr_str = format!("{addr:x}");
1225 (
1226 addr.clone(),
1227 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1228 )
1229 })
1230 .collect();
1231 decoder.set_tokens(tokens).await;
1232 }
1233 decoder
1234 }
1235
1236 fn load_test_msg(name: &str) -> FeedMessage<BlockHeader> {
1237 use std::{fs, path::Path};
1238
1239 use tycho_client::feed::dto;
1240 let project_root = env!("CARGO_MANIFEST_DIR");
1241 let asset_path = Path::new(project_root).join(format!("tests/assets/decoder/{name}.json"));
1242 let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset");
1243 let feed_msg: dto::FeedMessage<BlockHeader> =
1244 serde_json::from_str(&json_data).expect("Failed to deserialize FeedMsg json!");
1245 FeedMessage::from(feed_msg)
1246 }
1247
1248 #[tokio::test]
1249 async fn test_decode() {
1250 let decoder = setup_decoder(true).await;
1251
1252 let msg = load_test_msg("uniswap_v2_snapshot");
1253 let res1 = decoder
1254 .decode(&msg)
1255 .await
1256 .expect("decode failure");
1257 let msg = load_test_msg("uniswap_v2_delta");
1258 let res2 = decoder
1259 .decode(&msg)
1260 .await
1261 .expect("decode failure");
1262
1263 assert_eq!(res1.states.len(), 1);
1264 assert_eq!(res2.states.len(), 1);
1265 assert_eq!(res1.sync_states.len(), 1);
1266 assert_eq!(res2.sync_states.len(), 1);
1267 }
1268
1269 #[tokio::test]
1270 async fn test_decode_component_missing_token() {
1271 let decoder = setup_decoder(false).await;
1272 let tokens = [Bytes::from("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").lpad(20, 0)]
1273 .iter()
1274 .map(|addr| {
1275 let addr_str = format!("{addr:x}");
1276 (
1277 addr.clone(),
1278 Token::new(addr, &addr_str, 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1279 )
1280 })
1281 .collect();
1282 decoder.set_tokens(tokens).await;
1283
1284 let msg = load_test_msg("uniswap_v2_snapshot");
1285 let res1 = decoder
1286 .decode(&msg)
1287 .await
1288 .expect("decode failure");
1289
1290 assert_eq!(res1.states.len(), 0);
1291 }
1292
1293 #[tokio::test]
1294 async fn test_decode_component_bad_id() {
1295 let decoder = setup_decoder(true).await;
1296 let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1297
1298 match decoder.decode(&msg).await {
1299 Err(StreamDecodeError::Fatal(msg)) => {
1300 assert_eq!(msg, "Component id mismatch");
1301 }
1302 Ok(_) => {
1303 panic!("Expected failures to be raised")
1304 }
1305 }
1306 }
1307
1308 #[rstest]
1309 #[case(true)]
1310 #[case(false)]
1311 #[tokio::test]
1312 async fn test_decode_component_bad_state(#[case] skip_failures: bool) {
1313 let mut decoder = setup_decoder(true).await;
1314 decoder.skip_state_decode_failures = skip_failures;
1315
1316 let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1317 match decoder.decode(&msg).await {
1318 Err(StreamDecodeError::Fatal(msg)) => {
1319 if !skip_failures {
1320 assert_eq!(msg, "Missing attributes reserve0");
1321 } else {
1322 panic!("Expected failures to be ignored. Err: {msg}")
1323 }
1324 }
1325 Ok(res) => {
1326 if !skip_failures {
1327 panic!("Expected failures to be raised")
1328 } else {
1329 assert_eq!(res.states.len(), 0);
1330 }
1331 }
1332 }
1333 }
1334
1335 #[tokio::test]
1336 async fn test_decode_updates_state_on_contract_change() {
1337 let decoder = setup_decoder(true).await;
1338
1339 let mut mock_state = MockProtocolSim::new();
1341
1342 mock_state
1343 .expect_clone_box()
1344 .times(1)
1345 .returning(|| {
1346 let mut cloned_mock_state = MockProtocolSim::new();
1347 cloned_mock_state
1349 .expect_delta_transition()
1350 .times(1)
1351 .returning(|_, _, _| Ok(()));
1352 cloned_mock_state
1353 .expect_clone_box()
1354 .times(1)
1355 .returning(|| Box::new(MockProtocolSim::new()));
1356 Box::new(cloned_mock_state)
1357 });
1358
1359 let pool_id =
1361 "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
1362 decoder
1363 .state
1364 .write()
1365 .await
1366 .states
1367 .insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
1368 decoder
1369 .state
1370 .write()
1371 .await
1372 .contracts_map
1373 .insert(
1374 Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
1375 HashSet::from([pool_id.clone()]),
1376 );
1377
1378 let msg = load_test_msg("balancer_v2_delta");
1380
1381 let _ = decoder
1383 .decode(&msg)
1384 .await
1385 .expect("decode failure");
1386
1387 }
1389
1390 #[test]
1391 fn test_generate_proxy_token_address() {
1392 let idx = 1;
1393 let generated_address =
1394 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1395 assert_eq!(generated_address, address!("000000000000000000000000000000001badbabe"));
1396
1397 let idx = 123456;
1398 let generated_address =
1399 generate_proxy_token_address(idx).expect("proxy token address should be valid");
1400 assert_eq!(generated_address, address!("00000000000000000000000000001e240badbabe"));
1401 }
1402
1403 #[tokio::test(flavor = "multi_thread")]
1404 async fn test_euler_hook_low_pool_manager_balance() {
1405 let mut decoder = TychoStreamDecoder::new();
1406
1407 decoder.register_decoder_with_context::<crate::evm::protocol::uniswap_v4::state::UniswapV4State>(
1408 "uniswap_v4_hooks", DecoderContext::new().vm_traces(true)
1409 );
1410
1411 let weth = Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap();
1412 let teth = Bytes::from_str("0xd11c452fc99cf405034ee446803b6f6c1f6d5ed8").unwrap();
1413 let tokens = HashMap::from([
1414 (
1415 weth.clone(),
1416 Token::new(&weth, "WETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1417 ),
1418 (
1419 teth.clone(),
1420 Token::new(&teth, "tETH", 18, 100, &[Some(100_000)], Chain::Ethereum, 100),
1421 ),
1422 ]);
1423
1424 decoder.set_tokens(tokens.clone()).await;
1425
1426 let msg = load_test_msg("euler_hook_snapshot");
1427 let res = decoder
1428 .decode(&msg)
1429 .await
1430 .expect("decode failure");
1431
1432 let pool_state = res
1433 .states
1434 .get("0xc70d7fbd7fcccdf726e02fed78548b40dc52502b097c7a1ee7d995f4d4396134")
1435 .expect("Couldn't find target pool");
1436 let amount_out = pool_state
1437 .get_amount_out(
1438 BigUint::from_str("1000000000000000000").unwrap(),
1439 tokens.get(&teth).unwrap(),
1440 tokens.get(&weth).unwrap(),
1441 )
1442 .expect("Get amount out failed");
1443
1444 assert_eq!(amount_out.amount, BigUint::from_str("1216190190361759119").unwrap());
1445 }
1446}