eventify_primitives/networks/ethereum/
block.rs

1use std::{fmt::Debug, hash::Hash};
2
3use alloy_primitives::{B256, U256};
4use eyre::Result;
5use redis::AsyncCommands;
6use sqlx::{Error as SqlError, FromRow};
7use utoipa::ToSchema;
8
9use crate::{
10    networks::{core::CoreBlock, NetworkKind, ResourceKind},
11    traits::{Block, Emit, Insert, Stream},
12    PropagateError,
13};
14
15#[derive(
16    Clone,
17    Debug,
18    Default,
19    serde::Deserialize,
20    serde::Serialize,
21    PartialEq,
22    Eq,
23    Hash,
24    FromRow,
25    ToSchema,
26)]
27pub struct EthBlock {
28    #[serde(flatten)]
29    core: CoreBlock,
30
31    #[serde(rename = "withdrawalsRoot")]
32    pub withdrawals_hash: Option<B256>,
33
34    #[serde(rename = "totalDifficulty")]
35    pub total_difficulty: Option<U256>,
36
37    /// added by EIP-1559
38    #[serde(rename = "baseFeePerGas")]
39    pub base_fee: Option<U256>,
40
41    /// added by EIP-4844
42    #[serde(rename = "blobGasUsed")]
43    pub blob_gas_used: Option<U256>,
44
45    /// added by EIP-4844
46    #[serde(rename = "excessBlobGas")]
47    pub excess_blob_gas: Option<U256>,
48
49    /// added by EIP-4788
50    #[serde(rename = "parentBeaconBlockRoot")]
51    pub parent_beacon_root: Option<B256>,
52}
53
54impl Block for EthBlock {
55    fn core(&self) -> &CoreBlock {
56        &self.core
57    }
58}
59
60impl Insert for EthBlock {
61    async fn insert(&self, pool: &sqlx::PgPool, _: &Option<B256>) -> Result<(), SqlError> {
62        let (
63            number,
64            hash,
65            parent_hash,
66            mix_digest,
67            uncle_hash,
68            receipt_hash,
69            root,
70            tx_hash,
71            coinbase,
72            nonce,
73            gas_used,
74            gas_limit,
75            difficulty,
76            extra,
77            bloom,
78            time,
79        ) = self.core().db_repr();
80
81        let withdrawals_hash = self.withdrawals_hash.as_ref().map(|v| v.as_slice());
82        let total_difficulty = self.total_difficulty.as_ref().map(|v| v.as_le_slice());
83        let base_fee = self.base_fee.map(|v| v.to::<i64>());
84        let parent_beacon_root = self.parent_beacon_root.as_ref().map(|v| v.as_slice());
85        let blob_gas_used = self.blob_gas_used.as_ref().map(|v| v.as_le_slice());
86        let excess_blob_gas = self.excess_blob_gas.as_ref().map(|v| v.as_le_slice());
87
88        let sql = r#"INSERT INTO block (
89            network,
90            number,
91            hash,
92            parent_hash,
93            mix_digest,
94            uncle_hash,
95            receipt_hash,
96            root,
97            tx_hash,
98            coinbase,
99            nonce,
100            gas_used,
101            gas_limit,
102            difficulty,
103            extra,
104            bloom,
105            time,
106
107            withdrawals_hash,
108            total_difficulty,
109            base_fee,
110            parent_beacon_root,
111            blob_gas_used,
112            excess_blob_gas
113            ) VALUES (
114                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23
115            ) ON CONFLICT DO NOTHING"#;
116
117        sqlx::query(sql)
118            .bind(NetworkKind::Ethereum)
119            .bind(number)
120            .bind(hash)
121            .bind(parent_hash)
122            .bind(mix_digest)
123            .bind(uncle_hash)
124            .bind(receipt_hash)
125            .bind(root)
126            .bind(tx_hash)
127            .bind(coinbase)
128            .bind(nonce)
129            .bind(gas_used)
130            .bind(gas_limit)
131            .bind(difficulty)
132            .bind(extra)
133            .bind(bloom)
134            .bind(time)
135            .bind(withdrawals_hash)
136            .bind(total_difficulty)
137            .bind(base_fee)
138            .bind(parent_beacon_root)
139            .bind(blob_gas_used)
140            .bind(excess_blob_gas)
141            .execute(pool)
142            .await?;
143
144        Ok(())
145    }
146}
147
148impl Emit for EthBlock {
149    async fn emit(
150        &self,
151        queue: &redis::Client,
152        network: &crate::networks::NetworkKind,
153    ) -> Result<(), PropagateError> {
154        let mut con = queue.get_async_connection().await?;
155
156        let channel = format!("{}:{}", network, ResourceKind::Block);
157        con.lpush(channel, serde_json::to_string(self)?).await?;
158
159        Ok(())
160    }
161}
162
163impl Stream for EthBlock {
164    async fn stream(
165        &self,
166        queue: &redis::Client,
167        network: &crate::networks::NetworkKind,
168    ) -> Result<(), PropagateError> {
169        let mut con = queue.get_async_connection().await?;
170
171        let key = format!("{}:{}", network, ResourceKind::Block);
172        let serialized_data = serde_json::to_string(self)?;
173
174        let res: redis::RedisResult<String> = redis::cmd("XADD")
175            .arg(&key)
176            .arg("MAXLEN")
177            .arg("~")
178            .arg("100000")
179            .arg("*")
180            .arg("payload")
181            .arg(serialized_data)
182            .query_async(&mut con)
183            .await;
184
185        match res {
186            Ok(_) => Ok(()),
187            Err(e) => Err(PropagateError::StreamError(e.to_string())),
188        }
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn deserialize_eth_block() {
198        let json = serde_json::json!(
199            {
200                "parentHash": "0xe21d9fc49e447805ab1f8cf3c647aa12bf8342c4418076ee9ef2e9fb8d551136",
201                "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
202                "miner": "0xe43cc5b6ff052f5aa931a4f9ef2bfa0c500014ca",
203                "stateRoot": "0x06d72f7ea43994c1ecc8c639367751c60c314efd8ad7d1ff45683b5db841ba09",
204                "transactionsRoot": "0x93069aa747ff207ed717eb530f1a297104473fb71cfa5422b6300581c656e02c",
205                "receiptsRoot": "0xb2e099561441fa9d7d25d69780d185eb00f5ca39033679ef827423ddc68b81f0",
206                "logsBloom": "0x2d3110858080012b80550082c0a412160001000064020408800522102ac019100040140842a8804090412300100c45040a61022008402c0702429080042b050128004288012088084c10180880d870a004008205e17001809489014000ec088005000080229009690200d100801808040320906104284e0c0601e450044a0004800091c0816a100020400242000a00c041600020210050280002a35b11900811c700202000822120001050805138008045400080240298081020042252a08c50130c000601930100100e00810286a0448260000205200018e8d200824010f00030f03001010010060000766328406081082a15624211406504a0089001c006a0",
207                "difficulty": "0x0",
208                "number": "0x128669b",
209                "gasLimit": "0x1c9c380",
210                "gasUsed": "0x377830",
211                "timestamp": "0x65f161eb",
212                "extraData": "0xd883010d0e846765746888676f312e32312e36856c696e7578",
213                "mixHash": "0xc282f24a1ac767946aafb8de743847a3561b4f0dc203e6e0093660670a77ffdc",
214                "nonce": "0x0000000000000000",
215                "baseFeePerGas": "0xb00d096a1",
216                "withdrawalsRoot": "0x4be4c436558be298a46793081f03ea74aaedd8fb5ac8ee90ab1eba42b1a38f35",
217                "blobGasUsed": null,
218                "excessBlobGas": null,
219                "parentBeaconBlockRoot": null,
220                "hash": "0xe2eb1899da1f3c73105cfd383de9f7792c9491a60d5ac1a61a68c521c0c53902"
221              }
222        );
223
224        assert!(serde_json::from_value::<EthBlock>(json).is_ok());
225    }
226
227    #[test]
228    fn deserialize_empty_eth_block() {
229        let json = serde_json::json!({});
230
231        assert!(serde_json::from_value::<EthBlock>(json).is_err());
232    }
233}