1use std::{
2 any::Any,
3 collections::{hash_map::Entry, HashMap},
4 sync::Arc,
5};
6
7use chrono::NaiveDateTime;
8use serde::{Deserialize, Serialize};
9use tracing::warn;
10
11use crate::{
12 models::{
13 contract::{AccountBalance, AccountChangesWithTx, AccountDelta},
14 protocol::{
15 ComponentBalance, ProtocolChangesWithTx, ProtocolComponent, ProtocolComponentStateDelta,
16 },
17 token::CurrencyToken,
18 Address, Chain, ComponentId, ExtractorIdentity, NormalisedMessage,
19 },
20 Bytes,
21};
22
23#[derive(Clone, Default, PartialEq, Serialize, Deserialize, Debug)]
24pub struct Block {
25 pub number: u64,
26 pub chain: Chain,
27 pub hash: Bytes,
28 pub parent_hash: Bytes,
29 pub ts: NaiveDateTime,
30}
31
32impl Block {
33 pub fn new(
34 number: u64,
35 chain: Chain,
36 hash: Bytes,
37 parent_hash: Bytes,
38 ts: NaiveDateTime,
39 ) -> Self {
40 Block { hash, parent_hash, number, chain, ts }
41 }
42}
43
44#[derive(Clone, Default, PartialEq, Debug)]
45pub struct Transaction {
46 pub hash: Bytes,
47 pub block_hash: Bytes,
48 pub from: Bytes,
49 pub to: Option<Bytes>,
50 pub index: u64,
51}
52
53impl Transaction {
54 pub fn new(hash: Bytes, block_hash: Bytes, from: Bytes, to: Option<Bytes>, index: u64) -> Self {
55 Transaction { hash, block_hash, from, to, index }
56 }
57}
58
59pub struct BlockTransactionDeltas<T> {
60 pub extractor: String,
61 pub chain: Chain,
62 pub block: Block,
63 pub revert: bool,
64 pub deltas: Vec<TransactionDeltaGroup<T>>,
65}
66
67#[allow(dead_code)]
68pub struct TransactionDeltaGroup<T> {
69 changes: T,
70 protocol_component: HashMap<String, ProtocolComponent>,
71 component_balances: HashMap<String, ComponentBalance>,
72 component_tvl: HashMap<String, f64>,
73 tx: Transaction,
74}
75
76#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
77pub struct BlockAggregatedChanges {
78 pub extractor: String,
79 pub chain: Chain,
80 pub block: Block,
81 pub finalized_block_height: u64,
82 pub revert: bool,
83 pub state_deltas: HashMap<String, ProtocolComponentStateDelta>,
84 pub account_deltas: HashMap<Bytes, AccountDelta>,
85 pub new_tokens: HashMap<Address, CurrencyToken>,
86 pub new_protocol_components: HashMap<String, ProtocolComponent>,
87 pub deleted_protocol_components: HashMap<String, ProtocolComponent>,
88 pub component_balances: HashMap<ComponentId, HashMap<Bytes, ComponentBalance>>,
89 pub account_balances: HashMap<Address, HashMap<Address, AccountBalance>>,
90 pub component_tvl: HashMap<String, f64>,
91}
92
93impl BlockAggregatedChanges {
94 #[allow(clippy::too_many_arguments)]
95 pub fn new(
96 extractor: &str,
97 chain: Chain,
98 block: Block,
99 finalized_block_height: u64,
100 revert: bool,
101 state_deltas: HashMap<String, ProtocolComponentStateDelta>,
102 account_deltas: HashMap<Bytes, AccountDelta>,
103 new_tokens: HashMap<Address, CurrencyToken>,
104 new_components: HashMap<String, ProtocolComponent>,
105 deleted_components: HashMap<String, ProtocolComponent>,
106 component_balances: HashMap<ComponentId, HashMap<Bytes, ComponentBalance>>,
107 account_balances: HashMap<Address, HashMap<Address, AccountBalance>>,
108 component_tvl: HashMap<String, f64>,
109 ) -> Self {
110 Self {
111 extractor: extractor.to_string(),
112 chain,
113 block,
114 finalized_block_height,
115 revert,
116 state_deltas,
117 account_deltas,
118 new_tokens,
119 new_protocol_components: new_components,
120 deleted_protocol_components: deleted_components,
121 component_balances,
122 account_balances,
123 component_tvl,
124 }
125 }
126}
127
128impl std::fmt::Display for BlockAggregatedChanges {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 write!(f, "block_number: {}, extractor: {}", self.block.number, self.extractor)
131 }
132}
133
134#[typetag::serde]
135impl NormalisedMessage for BlockAggregatedChanges {
136 fn source(&self) -> ExtractorIdentity {
137 ExtractorIdentity::new(self.chain, &self.extractor)
138 }
139
140 fn drop_state(&self) -> Arc<dyn NormalisedMessage> {
141 Arc::new(Self {
142 extractor: self.extractor.clone(),
143 chain: self.chain,
144 block: self.block.clone(),
145 finalized_block_height: self.finalized_block_height,
146 revert: self.revert,
147 account_deltas: HashMap::new(),
148 state_deltas: HashMap::new(),
149 new_tokens: self.new_tokens.clone(),
150 new_protocol_components: self.new_protocol_components.clone(),
151 deleted_protocol_components: self.deleted_protocol_components.clone(),
152 component_balances: self.component_balances.clone(),
153 account_balances: self.account_balances.clone(),
154 component_tvl: self.component_tvl.clone(),
155 })
156 }
157
158 fn as_any(&self) -> &dyn Any {
159 self
160 }
161}
162
163pub trait BlockScoped {
164 fn block(&self) -> Block;
165}
166
167impl BlockScoped for BlockAggregatedChanges {
168 fn block(&self) -> Block {
169 self.block.clone()
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Default)]
175pub struct TxWithChanges {
176 pub protocol_components: HashMap<ComponentId, ProtocolComponent>,
177 pub account_deltas: HashMap<Address, AccountDelta>,
178 pub state_updates: HashMap<ComponentId, ProtocolComponentStateDelta>,
179 pub balance_changes: HashMap<ComponentId, HashMap<Address, ComponentBalance>>,
180 pub account_balance_changes: HashMap<Address, HashMap<Address, AccountBalance>>,
181 pub tx: Transaction,
182}
183
184impl TxWithChanges {
185 pub fn new(
186 protocol_components: HashMap<ComponentId, ProtocolComponent>,
187 account_deltas: HashMap<Address, AccountDelta>,
188 protocol_states: HashMap<ComponentId, ProtocolComponentStateDelta>,
189 balance_changes: HashMap<ComponentId, HashMap<Address, ComponentBalance>>,
190 account_balance_changes: HashMap<Address, HashMap<Address, AccountBalance>>,
191 tx: Transaction,
192 ) -> Self {
193 Self {
194 account_deltas,
195 protocol_components,
196 state_updates: protocol_states,
197 balance_changes,
198 account_balance_changes,
199 tx,
200 }
201 }
202
203 pub fn merge(&mut self, other: TxWithChanges) -> Result<(), String> {
214 if self.tx.block_hash != other.tx.block_hash {
215 return Err(format!(
216 "Can't merge TxWithChanges from different blocks: 0x{:x} != 0x{:x}",
217 self.tx.block_hash, other.tx.block_hash,
218 ));
219 }
220 if self.tx.hash == other.tx.hash {
221 return Err(format!(
222 "Can't merge TxWithChanges from the same transaction: 0x{:x}",
223 self.tx.hash
224 ));
225 }
226 if self.tx.index > other.tx.index {
227 return Err(format!(
228 "Can't merge TxWithChanges with lower transaction index: {} > {}",
229 self.tx.index, other.tx.index
230 ));
231 }
232
233 self.tx = other.tx;
234
235 for (key, value) in other.protocol_components {
239 match self.protocol_components.entry(key) {
240 Entry::Occupied(mut entry) => {
241 warn!(
242 "Overwriting new protocol component for id {} with a new one. This should never happen! Please check logic",
243 entry.get().id
244 );
245 entry.insert(value);
246 }
247 Entry::Vacant(entry) => {
248 entry.insert(value);
249 }
250 }
251 }
252
253 for (address, update) in other.account_deltas.clone().into_iter() {
255 match self.account_deltas.entry(address) {
256 Entry::Occupied(mut e) => {
257 e.get_mut().merge(update)?;
258 }
259 Entry::Vacant(e) => {
260 e.insert(update);
261 }
262 }
263 }
264
265 for (key, value) in other.state_updates {
267 match self.state_updates.entry(key) {
268 Entry::Occupied(mut entry) => {
269 entry.get_mut().merge(value)?;
270 }
271 Entry::Vacant(entry) => {
272 entry.insert(value);
273 }
274 }
275 }
276
277 for (component_id, balance_changes) in other.balance_changes {
279 let token_balances = self
280 .balance_changes
281 .entry(component_id)
282 .or_default();
283 for (token, balance) in balance_changes {
284 token_balances.insert(token, balance);
285 }
286 }
287
288 for (account_addr, balance_changes) in other.account_balance_changes {
290 let token_balances = self
291 .account_balance_changes
292 .entry(account_addr)
293 .or_default();
294 for (token, balance) in balance_changes {
295 token_balances.insert(token, balance);
296 }
297 }
298
299 Ok(())
300 }
301}
302
303impl From<AccountChangesWithTx> for TxWithChanges {
304 fn from(value: AccountChangesWithTx) -> Self {
305 Self {
306 protocol_components: value.protocol_components,
307 account_deltas: value.account_deltas,
308 state_updates: HashMap::new(),
309 balance_changes: value.component_balances,
310 account_balance_changes: value.account_balances,
311 tx: value.tx,
312 }
313 }
314}
315
316impl From<ProtocolChangesWithTx> for TxWithChanges {
317 fn from(value: ProtocolChangesWithTx) -> Self {
318 Self {
319 protocol_components: value.new_protocol_components,
320 account_deltas: HashMap::new(),
321 state_updates: value.protocol_states,
322 balance_changes: value.balance_changes,
323 account_balance_changes: HashMap::new(),
324 tx: value.tx,
325 }
326 }
327}
328
329#[derive(Copy, Clone, Debug, PartialEq)]
330pub enum BlockTag {
331 Finalized,
333 Safe,
335 Latest,
337 Earliest,
339 Pending,
341 Number(u64),
343}
344
345#[cfg(test)]
346pub mod fixtures {
347 use std::str::FromStr;
348
349 use super::*;
350 use crate::models::ChangeType;
351
352 pub fn transaction01() -> Transaction {
353 Transaction::new(
354 Bytes::zero(32),
355 Bytes::zero(32),
356 Bytes::zero(20),
357 Some(Bytes::zero(20)),
358 10,
359 )
360 }
361
362 pub fn create_transaction(hash: &str, block: &str, index: u64) -> Transaction {
363 Transaction::new(
364 hash.parse().unwrap(),
365 block.parse().unwrap(),
366 Bytes::zero(20),
367 Some(Bytes::zero(20)),
368 index,
369 )
370 }
371
372 #[test]
373 fn test_merge_tx_with_changes() {
374 let component_id = "ambient_USDC_ETH".to_string();
375 let base_token = Bytes::from_str("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
376 let quote_token = Bytes::from_str("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
377 let contract_addr = Bytes::from_str("aaaaaaaaa24eeeb8d57d431224f73832bc34f688").unwrap();
378 let tx_hash0 = "0x2f6350a292c0fc918afe67cb893744a080dacb507b0cea4cc07437b8aff23cdb";
379 let tx_hash1 = "0x0d9e0da36cf9f305a189965b248fc79c923619801e8ab5ef158d4fd528a291ad";
380 let block = "0x0000000000000000000000000000000000000000000000000000000000000000";
381 let mut changes1 = TxWithChanges::new(
382 HashMap::from([(
383 component_id.clone(),
384 ProtocolComponent {
385 id: component_id.clone(),
386 protocol_system: "test".to_string(),
387 protocol_type_name: "vm:pool".to_string(),
388 chain: Chain::Ethereum,
389 tokens: vec![base_token.clone(), quote_token.clone()],
390 contract_addresses: vec![contract_addr.clone()],
391 static_attributes: Default::default(),
392 change: Default::default(),
393 creation_tx: Bytes::from_str(tx_hash0).unwrap(),
394 created_at: Default::default(),
395 },
396 )]),
397 [(
398 contract_addr.clone(),
399 AccountDelta::new(
400 Chain::Ethereum,
401 contract_addr.clone(),
402 HashMap::new(),
403 None,
404 Some(vec![0, 0, 0, 0].into()),
405 ChangeType::Creation,
406 ),
407 )]
408 .into_iter()
409 .collect(),
410 HashMap::new(),
411 HashMap::from([(
412 component_id.clone(),
413 HashMap::from([(
414 base_token.clone(),
415 ComponentBalance {
416 token: base_token.clone(),
417 balance: Bytes::from(800_u64).lpad(32, 0),
418 balance_float: 800.0,
419 component_id: component_id.clone(),
420 modify_tx: Bytes::from_str(tx_hash0).unwrap(),
421 },
422 )]),
423 )]),
424 HashMap::from([(
425 contract_addr.clone(),
426 HashMap::from([(
427 base_token.clone(),
428 AccountBalance {
429 token: base_token.clone(),
430 balance: Bytes::from(800_u64).lpad(32, 0),
431 modify_tx: Bytes::from_str(tx_hash0).unwrap(),
432 account: contract_addr.clone(),
433 },
434 )]),
435 )]),
436 create_transaction(tx_hash0, block, 1),
437 );
438 let changes2 = TxWithChanges::new(
439 HashMap::from([(
440 component_id.clone(),
441 ProtocolComponent {
442 id: component_id.clone(),
443 protocol_system: "test".to_string(),
444 protocol_type_name: "vm:pool".to_string(),
445 chain: Chain::Ethereum,
446 tokens: vec![base_token.clone(), quote_token],
447 contract_addresses: vec![contract_addr.clone()],
448 static_attributes: Default::default(),
449 change: Default::default(),
450 creation_tx: Bytes::from_str(tx_hash1).unwrap(),
451 created_at: Default::default(),
452 },
453 )]),
454 [(
455 contract_addr.clone(),
456 AccountDelta::new(
457 Chain::Ethereum,
458 contract_addr.clone(),
459 HashMap::new(),
460 None,
461 Some(vec![0, 0, 0, 0].into()),
462 ChangeType::Creation,
463 ),
464 )]
465 .into_iter()
466 .collect(),
467 HashMap::new(),
468 HashMap::from([(
469 component_id.clone(),
470 HashMap::from([(
471 base_token.clone(),
472 ComponentBalance {
473 token: base_token.clone(),
474 balance: Bytes::from(1000_u64).lpad(32, 0),
475 balance_float: 1000.0,
476 component_id: component_id.clone(),
477 modify_tx: Bytes::from_str(tx_hash1).unwrap(),
478 },
479 )]),
480 )]),
481 HashMap::from([(
482 contract_addr.clone(),
483 HashMap::from([(
484 base_token.clone(),
485 AccountBalance {
486 token: base_token.clone(),
487 balance: Bytes::from(1000_u64).lpad(32, 0),
488 modify_tx: Bytes::from_str(tx_hash1).unwrap(),
489 account: contract_addr.clone(),
490 },
491 )]),
492 )]),
493 create_transaction(tx_hash1, block, 2),
494 );
495
496 assert!(changes1.merge(changes2).is_ok());
497 assert_eq!(
498 changes1
499 .account_balance_changes
500 .get(&contract_addr)
501 .unwrap()
502 .get(&base_token)
503 .unwrap()
504 .balance,
505 Bytes::from(1000_u64).lpad(32, 0),
506 );
507 assert_eq!(
508 changes1
509 .balance_changes
510 .get(&component_id)
511 .unwrap()
512 .get(&base_token)
513 .unwrap()
514 .balance,
515 Bytes::from(1000_u64).lpad(32, 0),
516 );
517 assert_eq!(changes1.tx.hash, Bytes::from_str(tx_hash1).unwrap(),);
518 }
519
520 #[test]
521 fn test_merge_different_blocks() {
522 let mut tx1 = TxWithChanges::new(
523 HashMap::new(),
524 HashMap::new(),
525 HashMap::new(),
526 HashMap::new(),
527 HashMap::new(),
528 fixtures::create_transaction("0x01", "0x0abc", 1),
529 );
530
531 let tx2 = TxWithChanges::new(
532 HashMap::new(),
533 HashMap::new(),
534 HashMap::new(),
535 HashMap::new(),
536 HashMap::new(),
537 fixtures::create_transaction("0x02", "0x0def", 2),
538 );
539
540 assert!(tx1.merge(tx2).is_err());
541 }
542
543 #[test]
544 fn test_merge_same_transaction() {
545 let mut tx1 = TxWithChanges::new(
546 HashMap::new(),
547 HashMap::new(),
548 HashMap::new(),
549 HashMap::new(),
550 HashMap::new(),
551 fixtures::create_transaction("0x01", "0x0abc", 1),
552 );
553
554 let tx2 = TxWithChanges::new(
555 HashMap::new(),
556 HashMap::new(),
557 HashMap::new(),
558 HashMap::new(),
559 HashMap::new(),
560 fixtures::create_transaction("0x01", "0x0abc", 1),
561 );
562
563 assert!(tx1.merge(tx2).is_err());
564 }
565
566 #[test]
567 fn test_merge_lower_transaction_index() {
568 let mut tx1 = TxWithChanges::new(
569 HashMap::new(),
570 HashMap::new(),
571 HashMap::new(),
572 HashMap::new(),
573 HashMap::new(),
574 fixtures::create_transaction("0x02", "0x0abc", 2),
575 );
576
577 let tx2 = TxWithChanges::new(
578 HashMap::new(),
579 HashMap::new(),
580 HashMap::new(),
581 HashMap::new(),
582 HashMap::new(),
583 fixtures::create_transaction("0x01", "0x0abc", 1),
584 );
585
586 assert!(tx1.merge(tx2).is_err());
587 }
588}