1use std::{fmt::Debug, hash::Hash};
2
3use alloy_primitives::{Address, Bytes, B256, B64, U256, U64};
4use eyre::Result;
5use redis::AsyncCommands;
6use sqlx::{Error as SqlError, FromRow};
7use utoipa::ToSchema;
8
9use crate::{
10 networks::{LogKind, NetworkKind, ResourceKind},
11 PropagateError,
12};
13
14#[derive(
15 Clone,
16 Debug,
17 Default,
18 serde::Deserialize,
19 serde::Serialize,
20 PartialEq,
21 Eq,
22 Hash,
23 FromRow,
24 ToSchema,
25)]
26pub struct CoreBlock {
27 pub number: Option<U64>,
28 pub hash: Option<B256>,
29 #[serde(rename = "parentHash")]
30 pub parent_hash: B256,
31 #[serde(rename = "mixHash")]
32 pub mix_digest: Option<B256>,
33 #[serde(rename = "sha3Uncles")]
34 pub uncle_hash: B256,
35 #[serde(rename = "receiptsRoot")]
36 pub receipt_hash: B256,
37 #[serde(rename = "stateRoot")]
38 pub root: B256,
39 #[serde(rename = "transactionsRoot")]
40 pub tx_hash: B256,
41 #[serde(rename = "miner")]
42 pub coinbase: Address,
43 pub nonce: Option<B64>,
44 #[serde(rename = "gasUsed")]
45 pub gas_used: U256,
46 #[serde(rename = "gasLimit")]
47 pub gas_limit: U256,
48 pub difficulty: U256,
49 #[serde(rename = "extraData")]
50 pub extra: Bytes,
51 #[serde(rename = "logsBloom")]
52 pub bloom: Option<Bytes>,
53 #[serde(rename = "timestamp")]
54 pub time: U256,
55}
56
57impl CoreBlock {
58 pub fn number(&self) -> Option<U64> {
59 self.number
60 }
61
62 pub fn hash(&self) -> Option<B256> {
63 self.hash
64 }
65
66 pub fn parent_hash(&self) -> B256 {
67 self.parent_hash
68 }
69
70 pub fn mix_digest(&self) -> Option<B256> {
71 self.mix_digest
72 }
73
74 pub fn uncle_hash(&self) -> B256 {
75 self.uncle_hash
76 }
77
78 pub fn receipt_hash(&self) -> B256 {
79 self.receipt_hash
80 }
81
82 pub fn root(&self) -> B256 {
83 self.root
84 }
85
86 pub fn tx_hash(&self) -> B256 {
87 self.tx_hash
88 }
89
90 pub fn coinbase(&self) -> Address {
91 self.coinbase
92 }
93
94 pub fn nonce(&self) -> Option<B64> {
95 self.nonce
96 }
97
98 pub fn gas_used(&self) -> U256 {
99 self.gas_used
100 }
101
102 pub fn gas_limit(&self) -> U256 {
103 self.gas_limit
104 }
105
106 pub fn difficulty(&self) -> U256 {
107 self.difficulty
108 }
109
110 pub fn extra(&self) -> Bytes {
111 self.extra.clone()
112 }
113
114 pub fn bloom(&self) -> Option<Bytes> {
115 self.bloom.clone()
116 }
117
118 pub fn time(&self) -> U256 {
119 self.time
120 }
121
122 pub async fn insert(&self, pool: &sqlx::PgPool, network: NetworkKind) -> Result<(), SqlError> {
123 let number = self.number.map(|v| v.to::<i64>());
124 let hash = self.hash.as_ref().map(|v| v.as_slice());
125 let parent_hash = self.parent_hash.as_slice();
126 let mix_digest = self.mix_digest.as_ref().map(|v| v.as_slice());
127 let uncle_hash = self.uncle_hash.as_slice();
128 let receipt_hash = self.receipt_hash.as_slice();
129 let root = self.root.as_slice();
130 let tx_hash = self.tx_hash.as_slice();
131 let coinbase = self.coinbase.as_slice();
132 let nonce = self.nonce.as_ref().map(|v| v.as_slice());
133 let gas_used = self.gas_used.as_le_slice();
134 let gas_limit = self.gas_limit.as_le_slice();
135 let difficulty = self.difficulty.as_le_slice();
136 let extra = self.extra.to_vec();
137 let bloom = self.bloom.as_ref().map(|v| v.to_vec());
138 let time = self.time.to::<i64>();
139
140 let query = r#"
141 INSERT INTO block (
142 network,
143
144 number,
145 hash,
146 parent_hash,
147 mix_digest,
148 uncle_hash,
149 receipt_hash,
150 root,
151 tx_hash,
152 coinbase,
153 nonce,
154 gas_used,
155 gas_limit,
156 difficulty,
157 extra,
158 bloom,
159 time
160 )
161 VALUES (
162 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17
163 ) ON CONFLICT DO NOTHING
164 "#;
165
166 sqlx::query(query)
167 .bind(network)
168 .bind(number)
169 .bind(hash)
170 .bind(parent_hash)
171 .bind(mix_digest)
172 .bind(uncle_hash)
173 .bind(receipt_hash)
174 .bind(root)
175 .bind(tx_hash)
176 .bind(coinbase)
177 .bind(nonce)
178 .bind(gas_used)
179 .bind(gas_limit)
180 .bind(difficulty)
181 .bind(extra)
182 .bind(bloom)
183 .bind(time)
184 .execute(pool)
185 .await?;
186
187 Ok(())
188 }
189
190 pub async fn emit(
191 &self,
192 queue: &redis::Client,
193 network: &NetworkKind,
194 ) -> Result<(), PropagateError> {
195 let mut con = queue.get_async_connection().await?;
196
197 let channel = format!("{}:{}", network, ResourceKind::Block);
198 con.lpush(channel, serde_json::to_string(self)?).await?;
199
200 Ok(())
201 }
202
203 pub async fn stream(
204 &self,
205 queue: &redis::Client,
206 network: &crate::networks::NetworkKind,
207 ) -> Result<(), PropagateError> {
208 let mut con = queue.get_async_connection().await?;
209
210 let key = format!("{}:{}", network, ResourceKind::Block);
211 let serialized_data = serde_json::to_string(self)?;
212
213 let res: redis::RedisResult<String> = redis::cmd("XADD")
214 .arg(&key)
215 .arg("MAXLEN")
216 .arg("~")
217 .arg("100000")
218 .arg("*")
219 .arg("payload")
220 .arg(serialized_data)
221 .query_async(&mut con)
222 .await;
223
224 match res {
225 Ok(_) => Ok(()),
226 Err(e) => Err(PropagateError::StreamError(e.to_string())),
227 }
228 }
229
230 #[allow(clippy::type_complexity)]
231 pub(crate) fn db_repr(
232 &self,
233 ) -> (
234 Option<i64>,
235 Option<&[u8]>,
236 &[u8],
237 Option<&[u8]>,
238 &[u8],
239 &[u8],
240 &[u8],
241 &[u8],
242 &[u8],
243 Option<&[u8]>,
244 &[u8],
245 &[u8],
246 &[u8],
247 Vec<u8>,
248 Option<Vec<u8>>,
249 i64,
250 ) {
251 let number = self.number.map(|v| v.to::<i64>());
252 let hash = self.hash.as_ref().map(|v| v.as_slice());
253 let parent_hash = self.parent_hash.as_slice();
254 let mix_digest = self.mix_digest.as_ref().map(|v| v.as_slice());
255 let uncle_hash = self.uncle_hash.as_slice();
256 let receipt_hash = self.receipt_hash.as_slice();
257 let root = self.root.as_slice();
258 let tx_hash = self.tx_hash.as_slice();
259 let coinbase = self.coinbase.as_slice();
260 let nonce = self.nonce.as_ref().map(|v| v.as_slice());
261 let gas_used = self.gas_used.as_le_slice();
262 let gas_limit = self.gas_limit.as_le_slice();
263 let difficulty = self.difficulty.as_le_slice();
264 let extra = self.extra.to_vec();
265 let bloom = self.bloom.as_ref().map(|v| v.to_vec());
266 let time = self.time.to::<i64>();
267
268 (
269 number,
270 hash,
271 parent_hash,
272 mix_digest,
273 uncle_hash,
274 receipt_hash,
275 root,
276 tx_hash,
277 coinbase,
278 nonce,
279 gas_used,
280 gas_limit,
281 difficulty,
282 extra,
283 bloom,
284 time,
285 )
286 }
287}
288
289#[derive(
290 Clone,
291 Debug,
292 Default,
293 serde::Deserialize,
294 serde::Serialize,
295 PartialEq,
296 Eq,
297 Hash,
298 FromRow,
299 ToSchema,
300)]
301pub struct CoreLog {
302 pub address: Address,
303 #[serde(rename = "blockHash")]
304 pub block_hash: Option<B256>,
305 #[serde(rename = "blockNumber")]
306 pub block_number: Option<U64>,
307 pub data: Bytes,
308 #[serde(rename = "logIndex")]
309 pub log_index: Option<U64>,
310 pub removed: bool,
311 pub topics: Vec<B256>,
312 #[serde(rename = "transactionIndex")]
313 pub tx_index: Option<U64>,
314 #[serde(rename = "transactionHash")]
315 pub tx_hash: Option<B256>,
316}
317
318impl CoreLog {
319 pub async fn insert(&self, pool: &sqlx::PgPool, network: NetworkKind) -> Result<(), SqlError> {
320 let address = self.address.as_slice();
321 let block_hash = self.block_hash.as_ref().map(|v| v.as_slice());
322 let block_number = self.block_number.map(|v| v.to::<i64>());
323 let data = self.data.0.as_ref();
324 let log_index = self.log_index.map(|v| v.to::<i64>());
325 let removed = self.removed;
326 let topic0 = self.topics.first().map(|v| v.as_slice());
327 let topic1 = self.topics.get(1).map(|v| v.as_slice());
328 let topic2 = self.topics.get(2).map(|v| v.as_slice());
329 let topic3 = self.topics.get(3).map(|v| v.as_slice());
330 let tx_index = self.tx_index.map(|v| v.to::<i64>());
331 let tx_hash = self.tx_hash.as_ref().map(|v| v.as_slice());
332
333 let query = r#"
334 INSERT INTO log (
335 network,
336 address,
337 block_hash,
338 block_number,
339 data,
340 log_index,
341 removed,
342 topic0,
343 topic1,
344 topic2,
345 topic3,
346 tx_index,
347 tx_hash
348 )
349 VALUES (
350 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
351 ) ON CONFLICT DO NOTHING
352 "#;
353
354 sqlx::query(query)
355 .bind(network)
356 .bind(address)
357 .bind(block_hash)
358 .bind(block_number)
359 .bind(data)
360 .bind(log_index)
361 .bind(removed)
362 .bind(topic0)
363 .bind(topic1)
364 .bind(topic2)
365 .bind(topic3)
366 .bind(tx_index)
367 .bind(tx_hash)
368 .execute(pool)
369 .await?;
370
371 Ok(())
372 }
373
374 pub async fn emit(
375 &self,
376 queue: &redis::Client,
377 network: &NetworkKind,
378 ) -> Result<(), PropagateError> {
379 let mut con = queue.get_async_connection().await?;
380
381 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
382 con.lpush(channel, serde_json::to_string(self)?).await?;
383
384 Ok(())
385 }
386
387 pub async fn stream(
388 &self,
389 queue: &redis::Client,
390 network: &crate::networks::NetworkKind,
391 ) -> Result<(), PropagateError> {
392 let mut con = queue.get_async_connection().await?;
393
394 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
395 let serialized_data = serde_json::to_string(self)?;
396
397 let res: redis::RedisResult<String> = redis::cmd("XADD")
398 .arg(&key)
399 .arg("MAXLEN")
400 .arg("~")
401 .arg("100000")
402 .arg("*")
403 .arg("payload")
404 .arg(serialized_data)
405 .query_async(&mut con)
406 .await;
407
408 match res {
409 Ok(_) => Ok(()),
410 Err(e) => Err(PropagateError::StreamError(e.to_string())),
411 }
412 }
413
414 #[allow(clippy::type_complexity)]
415 pub(crate) fn db_repr(
416 &self,
417 ) -> (
418 &[u8],
419 Option<&[u8]>,
420 Option<i64>,
421 &[u8],
422 Option<i64>,
423 bool,
424 Option<&[u8]>,
425 Option<&[u8]>,
426 Option<&[u8]>,
427 Option<&[u8]>,
428 Option<i64>,
429 Option<&[u8]>,
430 ) {
431 let address = self.address.as_slice();
432 let block_hash = self.block_hash.as_ref().map(|v| v.as_slice());
433 let block_number = self.block_number.map(|v| v.to::<i64>());
434 let data = self.data.0.as_ref();
435 let log_index = self.log_index.map(|v| v.to::<i64>());
436 let removed = self.removed;
437 let topic0 = self.topics.first().map(|v| v.as_slice());
438 let topic1 = self.topics.get(1).map(|v| v.as_slice());
439 let topic2 = self.topics.get(2).map(|v| v.as_slice());
440 let topic3 = self.topics.get(3).map(|v| v.as_slice());
441 let tx_index = self.tx_index.map(|v| v.to::<i64>());
442 let tx_hash = self.tx_hash.as_ref().map(|v| v.as_slice());
443
444 (
445 address,
446 block_hash,
447 block_number,
448 data,
449 log_index,
450 removed,
451 topic0,
452 topic1,
453 topic2,
454 topic3,
455 tx_index,
456 tx_hash,
457 )
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464
465 #[test]
466 fn deserialize_core_block() {
467 let json = serde_json::json!({
468 "hash": "0x4debecd96c87bd9be70b2a428d1e2d537e7f3ce77e353a7f031b4b66fb4d12eb",
469 "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
470 "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
471 "miner": "0x0000000000000000000000000000000000000000",
472 "stateRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
473 "transactionsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
474 "receiptsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
475 "number": "0x1155a9",
476 "gasUsed": "0x0",
477 "gasLimit": "0x0",
478 "extraData": "0x",
479 "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
480 "timestamp": "0x65f06454",
481 "difficulty": "0x0",
482 "mixHash": null,
483 "nonce": null
484 });
485
486 assert!(serde_json::from_value::<CoreBlock>(json).is_ok());
487 assert!(serde_json::from_value::<CoreBlock>(serde_json::json!({})).is_err());
488 }
489
490 #[test]
491 fn deserialize_core_log() {
492 let json = serde_json::json!(
493 {
494 "address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
495 "blockHash": "0xfad3e899227b47062b71c90e61eeb056a43052be544bc006031b10df8abc92f4",
496 "blockNumber": "0x1286817",
497 "data": "0x0000000000000000000000000000000000000000000000000000000077359400",
498 "logIndex": "0x75",
499 "removed": false,
500 "topics": [
501 "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
502 "0x0000000000000000000000004d06a4779ae0ed965598a1ef2b86b95a41ad7e81",
503 "0x00000000000000000000000011235534a66a33c366b84933d5202c841539d1c9"
504 ],
505 "transactionHash": "0x8acd636a4e0a0165bfbf003aa202a87b1a8e17e05183650ad39415861555aa6e",
506 "transactionIndex": "0x82"
507 }
508 );
509
510 assert!(serde_json::from_value::<CoreLog>(json).is_ok());
511 assert!(serde_json::from_value::<CoreLog>(serde_json::json!({})).is_err());
512 }
513}