tycho_common/models/
blockchain.rs

1use std::{
2    any::Any,
3    collections::{hash_map::Entry, HashMap},
4    sync::Arc,
5};
6
7use chrono::NaiveDateTime;
8use serde::{Deserialize, Serialize};
9use tracing::warn;
10
11use crate::{
12    models::{
13        contract::{AccountBalance, AccountChangesWithTx, AccountDelta},
14        protocol::{
15            ComponentBalance, ProtocolChangesWithTx, ProtocolComponent, ProtocolComponentStateDelta,
16        },
17        token::CurrencyToken,
18        Address, Chain, ComponentId, ExtractorIdentity, NormalisedMessage,
19    },
20    Bytes,
21};
22
23#[derive(Clone, Default, PartialEq, Serialize, Deserialize, Debug)]
24pub struct Block {
25    pub number: u64,
26    pub chain: Chain,
27    pub hash: Bytes,
28    pub parent_hash: Bytes,
29    pub ts: NaiveDateTime,
30}
31
32impl Block {
33    pub fn new(
34        number: u64,
35        chain: Chain,
36        hash: Bytes,
37        parent_hash: Bytes,
38        ts: NaiveDateTime,
39    ) -> Self {
40        Block { hash, parent_hash, number, chain, ts }
41    }
42}
43
44#[derive(Clone, Default, PartialEq, Debug)]
45pub struct Transaction {
46    pub hash: Bytes,
47    pub block_hash: Bytes,
48    pub from: Bytes,
49    pub to: Option<Bytes>,
50    pub index: u64,
51}
52
53impl Transaction {
54    pub fn new(hash: Bytes, block_hash: Bytes, from: Bytes, to: Option<Bytes>, index: u64) -> Self {
55        Transaction { hash, block_hash, from, to, index }
56    }
57}
58
59pub struct BlockTransactionDeltas<T> {
60    pub extractor: String,
61    pub chain: Chain,
62    pub block: Block,
63    pub revert: bool,
64    pub deltas: Vec<TransactionDeltaGroup<T>>,
65}
66
67#[allow(dead_code)]
68pub struct TransactionDeltaGroup<T> {
69    changes: T,
70    protocol_component: HashMap<String, ProtocolComponent>,
71    component_balances: HashMap<String, ComponentBalance>,
72    component_tvl: HashMap<String, f64>,
73    tx: Transaction,
74}
75
76#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
77pub struct BlockAggregatedChanges {
78    pub extractor: String,
79    pub chain: Chain,
80    pub block: Block,
81    pub finalized_block_height: u64,
82    pub revert: bool,
83    pub state_deltas: HashMap<String, ProtocolComponentStateDelta>,
84    pub account_deltas: HashMap<Bytes, AccountDelta>,
85    pub new_tokens: HashMap<Address, CurrencyToken>,
86    pub new_protocol_components: HashMap<String, ProtocolComponent>,
87    pub deleted_protocol_components: HashMap<String, ProtocolComponent>,
88    pub component_balances: HashMap<ComponentId, HashMap<Bytes, ComponentBalance>>,
89    pub account_balances: HashMap<Address, HashMap<Address, AccountBalance>>,
90    pub component_tvl: HashMap<String, f64>,
91}
92
93impl BlockAggregatedChanges {
94    #[allow(clippy::too_many_arguments)]
95    pub fn new(
96        extractor: &str,
97        chain: Chain,
98        block: Block,
99        finalized_block_height: u64,
100        revert: bool,
101        state_deltas: HashMap<String, ProtocolComponentStateDelta>,
102        account_deltas: HashMap<Bytes, AccountDelta>,
103        new_tokens: HashMap<Address, CurrencyToken>,
104        new_components: HashMap<String, ProtocolComponent>,
105        deleted_components: HashMap<String, ProtocolComponent>,
106        component_balances: HashMap<ComponentId, HashMap<Bytes, ComponentBalance>>,
107        account_balances: HashMap<Address, HashMap<Address, AccountBalance>>,
108        component_tvl: HashMap<String, f64>,
109    ) -> Self {
110        Self {
111            extractor: extractor.to_string(),
112            chain,
113            block,
114            finalized_block_height,
115            revert,
116            state_deltas,
117            account_deltas,
118            new_tokens,
119            new_protocol_components: new_components,
120            deleted_protocol_components: deleted_components,
121            component_balances,
122            account_balances,
123            component_tvl,
124        }
125    }
126}
127
128impl std::fmt::Display for BlockAggregatedChanges {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        write!(f, "block_number: {}, extractor: {}", self.block.number, self.extractor)
131    }
132}
133
134#[typetag::serde]
135impl NormalisedMessage for BlockAggregatedChanges {
136    fn source(&self) -> ExtractorIdentity {
137        ExtractorIdentity::new(self.chain, &self.extractor)
138    }
139
140    fn drop_state(&self) -> Arc<dyn NormalisedMessage> {
141        Arc::new(Self {
142            extractor: self.extractor.clone(),
143            chain: self.chain,
144            block: self.block.clone(),
145            finalized_block_height: self.finalized_block_height,
146            revert: self.revert,
147            account_deltas: HashMap::new(),
148            state_deltas: HashMap::new(),
149            new_tokens: self.new_tokens.clone(),
150            new_protocol_components: self.new_protocol_components.clone(),
151            deleted_protocol_components: self.deleted_protocol_components.clone(),
152            component_balances: self.component_balances.clone(),
153            account_balances: self.account_balances.clone(),
154            component_tvl: self.component_tvl.clone(),
155        })
156    }
157
158    fn as_any(&self) -> &dyn Any {
159        self
160    }
161}
162
163pub trait BlockScoped {
164    fn block(&self) -> Block;
165}
166
167impl BlockScoped for BlockAggregatedChanges {
168    fn block(&self) -> Block {
169        self.block.clone()
170    }
171}
172
173/// Changes grouped by their respective transaction.
174#[derive(Debug, Clone, PartialEq, Default)]
175pub struct TxWithChanges {
176    pub protocol_components: HashMap<ComponentId, ProtocolComponent>,
177    pub account_deltas: HashMap<Address, AccountDelta>,
178    pub state_updates: HashMap<ComponentId, ProtocolComponentStateDelta>,
179    pub balance_changes: HashMap<ComponentId, HashMap<Address, ComponentBalance>>,
180    pub account_balance_changes: HashMap<Address, HashMap<Address, AccountBalance>>,
181    pub tx: Transaction,
182}
183
184impl TxWithChanges {
185    pub fn new(
186        protocol_components: HashMap<ComponentId, ProtocolComponent>,
187        account_deltas: HashMap<Address, AccountDelta>,
188        protocol_states: HashMap<ComponentId, ProtocolComponentStateDelta>,
189        balance_changes: HashMap<ComponentId, HashMap<Address, ComponentBalance>>,
190        account_balance_changes: HashMap<Address, HashMap<Address, AccountBalance>>,
191        tx: Transaction,
192    ) -> Self {
193        Self {
194            account_deltas,
195            protocol_components,
196            state_updates: protocol_states,
197            balance_changes,
198            account_balance_changes,
199            tx,
200        }
201    }
202
203    /// Merges this update with another one.
204    ///
205    /// The method combines two `ChangesWithTx` instances if they are for the same
206    /// transaction.
207    ///
208    /// NB: It is assumed that `other` is a more recent update than `self` is and the two are
209    /// combined accordingly.
210    ///
211    /// # Errors
212    /// This method will return an error if any of the above conditions is violated.
213    pub fn merge(&mut self, other: TxWithChanges) -> Result<(), String> {
214        if self.tx.block_hash != other.tx.block_hash {
215            return Err(format!(
216                "Can't merge TxWithChanges from different blocks: 0x{:x} != 0x{:x}",
217                self.tx.block_hash, other.tx.block_hash,
218            ));
219        }
220        if self.tx.hash == other.tx.hash {
221            return Err(format!(
222                "Can't merge TxWithChanges from the same transaction: 0x{:x}",
223                self.tx.hash
224            ));
225        }
226        if self.tx.index > other.tx.index {
227            return Err(format!(
228                "Can't merge TxWithChanges with lower transaction index: {} > {}",
229                self.tx.index, other.tx.index
230            ));
231        }
232
233        self.tx = other.tx;
234
235        // Merge new protocol components
236        // Log a warning if a new protocol component for the same id already exists, because this
237        // should never happen.
238        for (key, value) in other.protocol_components {
239            match self.protocol_components.entry(key) {
240                Entry::Occupied(mut entry) => {
241                    warn!(
242                        "Overwriting new protocol component for id {} with a new one. This should never happen! Please check logic",
243                        entry.get().id
244                    );
245                    entry.insert(value);
246                }
247                Entry::Vacant(entry) => {
248                    entry.insert(value);
249                }
250            }
251        }
252
253        // Merge Account Updates
254        for (address, update) in other.account_deltas.clone().into_iter() {
255            match self.account_deltas.entry(address) {
256                Entry::Occupied(mut e) => {
257                    e.get_mut().merge(update)?;
258                }
259                Entry::Vacant(e) => {
260                    e.insert(update);
261                }
262            }
263        }
264
265        // Merge Protocol States
266        for (key, value) in other.state_updates {
267            match self.state_updates.entry(key) {
268                Entry::Occupied(mut entry) => {
269                    entry.get_mut().merge(value)?;
270                }
271                Entry::Vacant(entry) => {
272                    entry.insert(value);
273                }
274            }
275        }
276
277        // Merge component balance changes
278        for (component_id, balance_changes) in other.balance_changes {
279            let token_balances = self
280                .balance_changes
281                .entry(component_id)
282                .or_default();
283            for (token, balance) in balance_changes {
284                token_balances.insert(token, balance);
285            }
286        }
287
288        // Merge account balance changes
289        for (account_addr, balance_changes) in other.account_balance_changes {
290            let token_balances = self
291                .account_balance_changes
292                .entry(account_addr)
293                .or_default();
294            for (token, balance) in balance_changes {
295                token_balances.insert(token, balance);
296            }
297        }
298
299        Ok(())
300    }
301}
302
303impl From<AccountChangesWithTx> for TxWithChanges {
304    fn from(value: AccountChangesWithTx) -> Self {
305        Self {
306            protocol_components: value.protocol_components,
307            account_deltas: value.account_deltas,
308            state_updates: HashMap::new(),
309            balance_changes: value.component_balances,
310            account_balance_changes: value.account_balances,
311            tx: value.tx,
312        }
313    }
314}
315
316impl From<ProtocolChangesWithTx> for TxWithChanges {
317    fn from(value: ProtocolChangesWithTx) -> Self {
318        Self {
319            protocol_components: value.new_protocol_components,
320            account_deltas: HashMap::new(),
321            state_updates: value.protocol_states,
322            balance_changes: value.balance_changes,
323            account_balance_changes: HashMap::new(),
324            tx: value.tx,
325        }
326    }
327}
328
329#[derive(Copy, Clone, Debug, PartialEq)]
330pub enum BlockTag {
331    /// Finalized block
332    Finalized,
333    /// Safe block
334    Safe,
335    /// Latest block
336    Latest,
337    /// Earliest block (genesis)
338    Earliest,
339    /// Pending block (not yet part of the blockchain)
340    Pending,
341    /// Block by number
342    Number(u64),
343}
344
345#[cfg(test)]
346pub mod fixtures {
347    use std::str::FromStr;
348
349    use super::*;
350    use crate::models::ChangeType;
351
352    pub fn transaction01() -> Transaction {
353        Transaction::new(
354            Bytes::zero(32),
355            Bytes::zero(32),
356            Bytes::zero(20),
357            Some(Bytes::zero(20)),
358            10,
359        )
360    }
361
362    pub fn create_transaction(hash: &str, block: &str, index: u64) -> Transaction {
363        Transaction::new(
364            hash.parse().unwrap(),
365            block.parse().unwrap(),
366            Bytes::zero(20),
367            Some(Bytes::zero(20)),
368            index,
369        )
370    }
371
372    #[test]
373    fn test_merge_tx_with_changes() {
374        let component_id = "ambient_USDC_ETH".to_string();
375        let base_token = Bytes::from_str("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
376        let quote_token = Bytes::from_str("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
377        let contract_addr = Bytes::from_str("aaaaaaaaa24eeeb8d57d431224f73832bc34f688").unwrap();
378        let tx_hash0 = "0x2f6350a292c0fc918afe67cb893744a080dacb507b0cea4cc07437b8aff23cdb";
379        let tx_hash1 = "0x0d9e0da36cf9f305a189965b248fc79c923619801e8ab5ef158d4fd528a291ad";
380        let block = "0x0000000000000000000000000000000000000000000000000000000000000000";
381        let mut changes1 = TxWithChanges::new(
382            HashMap::from([(
383                component_id.clone(),
384                ProtocolComponent {
385                    id: component_id.clone(),
386                    protocol_system: "test".to_string(),
387                    protocol_type_name: "vm:pool".to_string(),
388                    chain: Chain::Ethereum,
389                    tokens: vec![base_token.clone(), quote_token.clone()],
390                    contract_addresses: vec![contract_addr.clone()],
391                    static_attributes: Default::default(),
392                    change: Default::default(),
393                    creation_tx: Bytes::from_str(tx_hash0).unwrap(),
394                    created_at: Default::default(),
395                },
396            )]),
397            [(
398                contract_addr.clone(),
399                AccountDelta::new(
400                    Chain::Ethereum,
401                    contract_addr.clone(),
402                    HashMap::new(),
403                    None,
404                    Some(vec![0, 0, 0, 0].into()),
405                    ChangeType::Creation,
406                ),
407            )]
408            .into_iter()
409            .collect(),
410            HashMap::new(),
411            HashMap::from([(
412                component_id.clone(),
413                HashMap::from([(
414                    base_token.clone(),
415                    ComponentBalance {
416                        token: base_token.clone(),
417                        balance: Bytes::from(800_u64).lpad(32, 0),
418                        balance_float: 800.0,
419                        component_id: component_id.clone(),
420                        modify_tx: Bytes::from_str(tx_hash0).unwrap(),
421                    },
422                )]),
423            )]),
424            HashMap::from([(
425                contract_addr.clone(),
426                HashMap::from([(
427                    base_token.clone(),
428                    AccountBalance {
429                        token: base_token.clone(),
430                        balance: Bytes::from(800_u64).lpad(32, 0),
431                        modify_tx: Bytes::from_str(tx_hash0).unwrap(),
432                        account: contract_addr.clone(),
433                    },
434                )]),
435            )]),
436            create_transaction(tx_hash0, block, 1),
437        );
438        let changes2 = TxWithChanges::new(
439            HashMap::from([(
440                component_id.clone(),
441                ProtocolComponent {
442                    id: component_id.clone(),
443                    protocol_system: "test".to_string(),
444                    protocol_type_name: "vm:pool".to_string(),
445                    chain: Chain::Ethereum,
446                    tokens: vec![base_token.clone(), quote_token],
447                    contract_addresses: vec![contract_addr.clone()],
448                    static_attributes: Default::default(),
449                    change: Default::default(),
450                    creation_tx: Bytes::from_str(tx_hash1).unwrap(),
451                    created_at: Default::default(),
452                },
453            )]),
454            [(
455                contract_addr.clone(),
456                AccountDelta::new(
457                    Chain::Ethereum,
458                    contract_addr.clone(),
459                    HashMap::new(),
460                    None,
461                    Some(vec![0, 0, 0, 0].into()),
462                    ChangeType::Creation,
463                ),
464            )]
465            .into_iter()
466            .collect(),
467            HashMap::new(),
468            HashMap::from([(
469                component_id.clone(),
470                HashMap::from([(
471                    base_token.clone(),
472                    ComponentBalance {
473                        token: base_token.clone(),
474                        balance: Bytes::from(1000_u64).lpad(32, 0),
475                        balance_float: 1000.0,
476                        component_id: component_id.clone(),
477                        modify_tx: Bytes::from_str(tx_hash1).unwrap(),
478                    },
479                )]),
480            )]),
481            HashMap::from([(
482                contract_addr.clone(),
483                HashMap::from([(
484                    base_token.clone(),
485                    AccountBalance {
486                        token: base_token.clone(),
487                        balance: Bytes::from(1000_u64).lpad(32, 0),
488                        modify_tx: Bytes::from_str(tx_hash1).unwrap(),
489                        account: contract_addr.clone(),
490                    },
491                )]),
492            )]),
493            create_transaction(tx_hash1, block, 2),
494        );
495
496        assert!(changes1.merge(changes2).is_ok());
497        assert_eq!(
498            changes1
499                .account_balance_changes
500                .get(&contract_addr)
501                .unwrap()
502                .get(&base_token)
503                .unwrap()
504                .balance,
505            Bytes::from(1000_u64).lpad(32, 0),
506        );
507        assert_eq!(
508            changes1
509                .balance_changes
510                .get(&component_id)
511                .unwrap()
512                .get(&base_token)
513                .unwrap()
514                .balance,
515            Bytes::from(1000_u64).lpad(32, 0),
516        );
517        assert_eq!(changes1.tx.hash, Bytes::from_str(tx_hash1).unwrap(),);
518    }
519
520    #[test]
521    fn test_merge_different_blocks() {
522        let mut tx1 = TxWithChanges::new(
523            HashMap::new(),
524            HashMap::new(),
525            HashMap::new(),
526            HashMap::new(),
527            HashMap::new(),
528            fixtures::create_transaction("0x01", "0x0abc", 1),
529        );
530
531        let tx2 = TxWithChanges::new(
532            HashMap::new(),
533            HashMap::new(),
534            HashMap::new(),
535            HashMap::new(),
536            HashMap::new(),
537            fixtures::create_transaction("0x02", "0x0def", 2),
538        );
539
540        assert!(tx1.merge(tx2).is_err());
541    }
542
543    #[test]
544    fn test_merge_same_transaction() {
545        let mut tx1 = TxWithChanges::new(
546            HashMap::new(),
547            HashMap::new(),
548            HashMap::new(),
549            HashMap::new(),
550            HashMap::new(),
551            fixtures::create_transaction("0x01", "0x0abc", 1),
552        );
553
554        let tx2 = TxWithChanges::new(
555            HashMap::new(),
556            HashMap::new(),
557            HashMap::new(),
558            HashMap::new(),
559            HashMap::new(),
560            fixtures::create_transaction("0x01", "0x0abc", 1),
561        );
562
563        assert!(tx1.merge(tx2).is_err());
564    }
565
566    #[test]
567    fn test_merge_lower_transaction_index() {
568        let mut tx1 = TxWithChanges::new(
569            HashMap::new(),
570            HashMap::new(),
571            HashMap::new(),
572            HashMap::new(),
573            HashMap::new(),
574            fixtures::create_transaction("0x02", "0x0abc", 2),
575        );
576
577        let tx2 = TxWithChanges::new(
578            HashMap::new(),
579            HashMap::new(),
580            HashMap::new(),
581            HashMap::new(),
582            HashMap::new(),
583            fixtures::create_transaction("0x01", "0x0abc", 1),
584        );
585
586        assert!(tx1.merge(tx2).is_err());
587    }
588}