eventify_primitives/networks/ethereum/
block.rs1use std::{fmt::Debug, hash::Hash};
2
3use alloy_primitives::{B256, U256};
4use eyre::Result;
5use redis::AsyncCommands;
6use sqlx::{Error as SqlError, FromRow};
7use utoipa::ToSchema;
8
9use crate::{
10 networks::{core::CoreBlock, NetworkKind, ResourceKind},
11 traits::{Block, Emit, Insert, Stream},
12 PropagateError,
13};
14
15#[derive(
16 Clone,
17 Debug,
18 Default,
19 serde::Deserialize,
20 serde::Serialize,
21 PartialEq,
22 Eq,
23 Hash,
24 FromRow,
25 ToSchema,
26)]
27pub struct EthBlock {
28 #[serde(flatten)]
29 core: CoreBlock,
30
31 #[serde(rename = "withdrawalsRoot")]
32 pub withdrawals_hash: Option<B256>,
33
34 #[serde(rename = "totalDifficulty")]
35 pub total_difficulty: Option<U256>,
36
37 #[serde(rename = "baseFeePerGas")]
39 pub base_fee: Option<U256>,
40
41 #[serde(rename = "blobGasUsed")]
43 pub blob_gas_used: Option<U256>,
44
45 #[serde(rename = "excessBlobGas")]
47 pub excess_blob_gas: Option<U256>,
48
49 #[serde(rename = "parentBeaconBlockRoot")]
51 pub parent_beacon_root: Option<B256>,
52}
53
54impl Block for EthBlock {
55 fn core(&self) -> &CoreBlock {
56 &self.core
57 }
58}
59
60impl Insert for EthBlock {
61 async fn insert(&self, pool: &sqlx::PgPool, _: &Option<B256>) -> Result<(), SqlError> {
62 let (
63 number,
64 hash,
65 parent_hash,
66 mix_digest,
67 uncle_hash,
68 receipt_hash,
69 root,
70 tx_hash,
71 coinbase,
72 nonce,
73 gas_used,
74 gas_limit,
75 difficulty,
76 extra,
77 bloom,
78 time,
79 ) = self.core().db_repr();
80
81 let withdrawals_hash = self.withdrawals_hash.as_ref().map(|v| v.as_slice());
82 let total_difficulty = self.total_difficulty.as_ref().map(|v| v.as_le_slice());
83 let base_fee = self.base_fee.map(|v| v.to::<i64>());
84 let parent_beacon_root = self.parent_beacon_root.as_ref().map(|v| v.as_slice());
85 let blob_gas_used = self.blob_gas_used.as_ref().map(|v| v.as_le_slice());
86 let excess_blob_gas = self.excess_blob_gas.as_ref().map(|v| v.as_le_slice());
87
88 let sql = r#"INSERT INTO block (
89 network,
90 number,
91 hash,
92 parent_hash,
93 mix_digest,
94 uncle_hash,
95 receipt_hash,
96 root,
97 tx_hash,
98 coinbase,
99 nonce,
100 gas_used,
101 gas_limit,
102 difficulty,
103 extra,
104 bloom,
105 time,
106
107 withdrawals_hash,
108 total_difficulty,
109 base_fee,
110 parent_beacon_root,
111 blob_gas_used,
112 excess_blob_gas
113 ) VALUES (
114 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23
115 ) ON CONFLICT DO NOTHING"#;
116
117 sqlx::query(sql)
118 .bind(NetworkKind::Ethereum)
119 .bind(number)
120 .bind(hash)
121 .bind(parent_hash)
122 .bind(mix_digest)
123 .bind(uncle_hash)
124 .bind(receipt_hash)
125 .bind(root)
126 .bind(tx_hash)
127 .bind(coinbase)
128 .bind(nonce)
129 .bind(gas_used)
130 .bind(gas_limit)
131 .bind(difficulty)
132 .bind(extra)
133 .bind(bloom)
134 .bind(time)
135 .bind(withdrawals_hash)
136 .bind(total_difficulty)
137 .bind(base_fee)
138 .bind(parent_beacon_root)
139 .bind(blob_gas_used)
140 .bind(excess_blob_gas)
141 .execute(pool)
142 .await?;
143
144 Ok(())
145 }
146}
147
148impl Emit for EthBlock {
149 async fn emit(
150 &self,
151 queue: &redis::Client,
152 network: &crate::networks::NetworkKind,
153 ) -> Result<(), PropagateError> {
154 let mut con = queue.get_async_connection().await?;
155
156 let channel = format!("{}:{}", network, ResourceKind::Block);
157 con.lpush(channel, serde_json::to_string(self)?).await?;
158
159 Ok(())
160 }
161}
162
163impl Stream for EthBlock {
164 async fn stream(
165 &self,
166 queue: &redis::Client,
167 network: &crate::networks::NetworkKind,
168 ) -> Result<(), PropagateError> {
169 let mut con = queue.get_async_connection().await?;
170
171 let key = format!("{}:{}", network, ResourceKind::Block);
172 let serialized_data = serde_json::to_string(self)?;
173
174 let res: redis::RedisResult<String> = redis::cmd("XADD")
175 .arg(&key)
176 .arg("MAXLEN")
177 .arg("~")
178 .arg("100000")
179 .arg("*")
180 .arg("payload")
181 .arg(serialized_data)
182 .query_async(&mut con)
183 .await;
184
185 match res {
186 Ok(_) => Ok(()),
187 Err(e) => Err(PropagateError::StreamError(e.to_string())),
188 }
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn deserialize_eth_block() {
198 let json = serde_json::json!(
199 {
200 "parentHash": "0xe21d9fc49e447805ab1f8cf3c647aa12bf8342c4418076ee9ef2e9fb8d551136",
201 "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
202 "miner": "0xe43cc5b6ff052f5aa931a4f9ef2bfa0c500014ca",
203 "stateRoot": "0x06d72f7ea43994c1ecc8c639367751c60c314efd8ad7d1ff45683b5db841ba09",
204 "transactionsRoot": "0x93069aa747ff207ed717eb530f1a297104473fb71cfa5422b6300581c656e02c",
205 "receiptsRoot": "0xb2e099561441fa9d7d25d69780d185eb00f5ca39033679ef827423ddc68b81f0",
206 "logsBloom": "0x2d3110858080012b80550082c0a412160001000064020408800522102ac019100040140842a8804090412300100c45040a61022008402c0702429080042b050128004288012088084c10180880d870a004008205e17001809489014000ec088005000080229009690200d100801808040320906104284e0c0601e450044a0004800091c0816a100020400242000a00c041600020210050280002a35b11900811c700202000822120001050805138008045400080240298081020042252a08c50130c000601930100100e00810286a0448260000205200018e8d200824010f00030f03001010010060000766328406081082a15624211406504a0089001c006a0",
207 "difficulty": "0x0",
208 "number": "0x128669b",
209 "gasLimit": "0x1c9c380",
210 "gasUsed": "0x377830",
211 "timestamp": "0x65f161eb",
212 "extraData": "0xd883010d0e846765746888676f312e32312e36856c696e7578",
213 "mixHash": "0xc282f24a1ac767946aafb8de743847a3561b4f0dc203e6e0093660670a77ffdc",
214 "nonce": "0x0000000000000000",
215 "baseFeePerGas": "0xb00d096a1",
216 "withdrawalsRoot": "0x4be4c436558be298a46793081f03ea74aaedd8fb5ac8ee90ab1eba42b1a38f35",
217 "blobGasUsed": null,
218 "excessBlobGas": null,
219 "parentBeaconBlockRoot": null,
220 "hash": "0xe2eb1899da1f3c73105cfd383de9f7792c9491a60d5ac1a61a68c521c0c53902"
221 }
222 );
223
224 assert!(serde_json::from_value::<EthBlock>(json).is_ok());
225 }
226
227 #[test]
228 fn deserialize_empty_eth_block() {
229 let json = serde_json::json!({});
230
231 assert!(serde_json::from_value::<EthBlock>(json).is_err());
232 }
233}