eventify_primitives/networks/
core.rs

1use std::{fmt::Debug, hash::Hash};
2
3use alloy_primitives::{Address, Bytes, B256, B64, U256, U64};
4use eyre::Result;
5use redis::AsyncCommands;
6use sqlx::{Error as SqlError, FromRow};
7use utoipa::ToSchema;
8
9use crate::{
10    networks::{LogKind, NetworkKind, ResourceKind},
11    PropagateError,
12};
13
14#[derive(
15    Clone,
16    Debug,
17    Default,
18    serde::Deserialize,
19    serde::Serialize,
20    PartialEq,
21    Eq,
22    Hash,
23    FromRow,
24    ToSchema,
25)]
26pub struct CoreBlock {
27    pub number: Option<U64>,
28    pub hash: Option<B256>,
29    #[serde(rename = "parentHash")]
30    pub parent_hash: B256,
31    #[serde(rename = "mixHash")]
32    pub mix_digest: Option<B256>,
33    #[serde(rename = "sha3Uncles")]
34    pub uncle_hash: B256,
35    #[serde(rename = "receiptsRoot")]
36    pub receipt_hash: B256,
37    #[serde(rename = "stateRoot")]
38    pub root: B256,
39    #[serde(rename = "transactionsRoot")]
40    pub tx_hash: B256,
41    #[serde(rename = "miner")]
42    pub coinbase: Address,
43    pub nonce: Option<B64>,
44    #[serde(rename = "gasUsed")]
45    pub gas_used: U256,
46    #[serde(rename = "gasLimit")]
47    pub gas_limit: U256,
48    pub difficulty: U256,
49    #[serde(rename = "extraData")]
50    pub extra: Bytes,
51    #[serde(rename = "logsBloom")]
52    pub bloom: Option<Bytes>,
53    #[serde(rename = "timestamp")]
54    pub time: U256,
55}
56
57impl CoreBlock {
58    pub fn number(&self) -> Option<U64> {
59        self.number
60    }
61
62    pub fn hash(&self) -> Option<B256> {
63        self.hash
64    }
65
66    pub fn parent_hash(&self) -> B256 {
67        self.parent_hash
68    }
69
70    pub fn mix_digest(&self) -> Option<B256> {
71        self.mix_digest
72    }
73
74    pub fn uncle_hash(&self) -> B256 {
75        self.uncle_hash
76    }
77
78    pub fn receipt_hash(&self) -> B256 {
79        self.receipt_hash
80    }
81
82    pub fn root(&self) -> B256 {
83        self.root
84    }
85
86    pub fn tx_hash(&self) -> B256 {
87        self.tx_hash
88    }
89
90    pub fn coinbase(&self) -> Address {
91        self.coinbase
92    }
93
94    pub fn nonce(&self) -> Option<B64> {
95        self.nonce
96    }
97
98    pub fn gas_used(&self) -> U256 {
99        self.gas_used
100    }
101
102    pub fn gas_limit(&self) -> U256 {
103        self.gas_limit
104    }
105
106    pub fn difficulty(&self) -> U256 {
107        self.difficulty
108    }
109
110    pub fn extra(&self) -> Bytes {
111        self.extra.clone()
112    }
113
114    pub fn bloom(&self) -> Option<Bytes> {
115        self.bloom.clone()
116    }
117
118    pub fn time(&self) -> U256 {
119        self.time
120    }
121
122    pub async fn insert(&self, pool: &sqlx::PgPool, network: NetworkKind) -> Result<(), SqlError> {
123        let number = self.number.map(|v| v.to::<i64>());
124        let hash = self.hash.as_ref().map(|v| v.as_slice());
125        let parent_hash = self.parent_hash.as_slice();
126        let mix_digest = self.mix_digest.as_ref().map(|v| v.as_slice());
127        let uncle_hash = self.uncle_hash.as_slice();
128        let receipt_hash = self.receipt_hash.as_slice();
129        let root = self.root.as_slice();
130        let tx_hash = self.tx_hash.as_slice();
131        let coinbase = self.coinbase.as_slice();
132        let nonce = self.nonce.as_ref().map(|v| v.as_slice());
133        let gas_used = self.gas_used.as_le_slice();
134        let gas_limit = self.gas_limit.as_le_slice();
135        let difficulty = self.difficulty.as_le_slice();
136        let extra = self.extra.to_vec();
137        let bloom = self.bloom.as_ref().map(|v| v.to_vec());
138        let time = self.time.to::<i64>();
139
140        let query = r#"
141            INSERT INTO block (
142                network,
143
144                number,
145                hash,
146                parent_hash,
147                mix_digest,
148                uncle_hash,
149                receipt_hash,
150                root,
151                tx_hash,
152                coinbase,
153                nonce,
154                gas_used,
155                gas_limit,
156                difficulty,
157                extra,
158                bloom,
159                time
160            )
161            VALUES (
162                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17
163            ) ON CONFLICT DO NOTHING
164        "#;
165
166        sqlx::query(query)
167            .bind(network)
168            .bind(number)
169            .bind(hash)
170            .bind(parent_hash)
171            .bind(mix_digest)
172            .bind(uncle_hash)
173            .bind(receipt_hash)
174            .bind(root)
175            .bind(tx_hash)
176            .bind(coinbase)
177            .bind(nonce)
178            .bind(gas_used)
179            .bind(gas_limit)
180            .bind(difficulty)
181            .bind(extra)
182            .bind(bloom)
183            .bind(time)
184            .execute(pool)
185            .await?;
186
187        Ok(())
188    }
189
190    pub async fn emit(
191        &self,
192        queue: &redis::Client,
193        network: &NetworkKind,
194    ) -> Result<(), PropagateError> {
195        let mut con = queue.get_async_connection().await?;
196
197        let channel = format!("{}:{}", network, ResourceKind::Block);
198        con.lpush(channel, serde_json::to_string(self)?).await?;
199
200        Ok(())
201    }
202
203    pub async fn stream(
204        &self,
205        queue: &redis::Client,
206        network: &crate::networks::NetworkKind,
207    ) -> Result<(), PropagateError> {
208        let mut con = queue.get_async_connection().await?;
209
210        let key = format!("{}:{}", network, ResourceKind::Block);
211        let serialized_data = serde_json::to_string(self)?;
212
213        let res: redis::RedisResult<String> = redis::cmd("XADD")
214            .arg(&key)
215            .arg("MAXLEN")
216            .arg("~")
217            .arg("100000")
218            .arg("*")
219            .arg("payload")
220            .arg(serialized_data)
221            .query_async(&mut con)
222            .await;
223
224        match res {
225            Ok(_) => Ok(()),
226            Err(e) => Err(PropagateError::StreamError(e.to_string())),
227        }
228    }
229
230    #[allow(clippy::type_complexity)]
231    pub(crate) fn db_repr(
232        &self,
233    ) -> (
234        Option<i64>,
235        Option<&[u8]>,
236        &[u8],
237        Option<&[u8]>,
238        &[u8],
239        &[u8],
240        &[u8],
241        &[u8],
242        &[u8],
243        Option<&[u8]>,
244        &[u8],
245        &[u8],
246        &[u8],
247        Vec<u8>,
248        Option<Vec<u8>>,
249        i64,
250    ) {
251        let number = self.number.map(|v| v.to::<i64>());
252        let hash = self.hash.as_ref().map(|v| v.as_slice());
253        let parent_hash = self.parent_hash.as_slice();
254        let mix_digest = self.mix_digest.as_ref().map(|v| v.as_slice());
255        let uncle_hash = self.uncle_hash.as_slice();
256        let receipt_hash = self.receipt_hash.as_slice();
257        let root = self.root.as_slice();
258        let tx_hash = self.tx_hash.as_slice();
259        let coinbase = self.coinbase.as_slice();
260        let nonce = self.nonce.as_ref().map(|v| v.as_slice());
261        let gas_used = self.gas_used.as_le_slice();
262        let gas_limit = self.gas_limit.as_le_slice();
263        let difficulty = self.difficulty.as_le_slice();
264        let extra = self.extra.to_vec();
265        let bloom = self.bloom.as_ref().map(|v| v.to_vec());
266        let time = self.time.to::<i64>();
267
268        (
269            number,
270            hash,
271            parent_hash,
272            mix_digest,
273            uncle_hash,
274            receipt_hash,
275            root,
276            tx_hash,
277            coinbase,
278            nonce,
279            gas_used,
280            gas_limit,
281            difficulty,
282            extra,
283            bloom,
284            time,
285        )
286    }
287}
288
289#[derive(
290    Clone,
291    Debug,
292    Default,
293    serde::Deserialize,
294    serde::Serialize,
295    PartialEq,
296    Eq,
297    Hash,
298    FromRow,
299    ToSchema,
300)]
301pub struct CoreLog {
302    pub address: Address,
303    #[serde(rename = "blockHash")]
304    pub block_hash: Option<B256>,
305    #[serde(rename = "blockNumber")]
306    pub block_number: Option<U64>,
307    pub data: Bytes,
308    #[serde(rename = "logIndex")]
309    pub log_index: Option<U64>,
310    pub removed: bool,
311    pub topics: Vec<B256>,
312    #[serde(rename = "transactionIndex")]
313    pub tx_index: Option<U64>,
314    #[serde(rename = "transactionHash")]
315    pub tx_hash: Option<B256>,
316}
317
318impl CoreLog {
319    pub async fn insert(&self, pool: &sqlx::PgPool, network: NetworkKind) -> Result<(), SqlError> {
320        let address = self.address.as_slice();
321        let block_hash = self.block_hash.as_ref().map(|v| v.as_slice());
322        let block_number = self.block_number.map(|v| v.to::<i64>());
323        let data = self.data.0.as_ref();
324        let log_index = self.log_index.map(|v| v.to::<i64>());
325        let removed = self.removed;
326        let topic0 = self.topics.first().map(|v| v.as_slice());
327        let topic1 = self.topics.get(1).map(|v| v.as_slice());
328        let topic2 = self.topics.get(2).map(|v| v.as_slice());
329        let topic3 = self.topics.get(3).map(|v| v.as_slice());
330        let tx_index = self.tx_index.map(|v| v.to::<i64>());
331        let tx_hash = self.tx_hash.as_ref().map(|v| v.as_slice());
332
333        let query = r#"
334            INSERT INTO log (
335                network,
336                address,
337                block_hash,
338                block_number,
339                data,
340                log_index,
341                removed,
342                topic0,
343                topic1,
344                topic2,
345                topic3,
346                tx_index,
347                tx_hash
348            )
349            VALUES (
350                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
351            ) ON CONFLICT DO NOTHING
352        "#;
353
354        sqlx::query(query)
355            .bind(network)
356            .bind(address)
357            .bind(block_hash)
358            .bind(block_number)
359            .bind(data)
360            .bind(log_index)
361            .bind(removed)
362            .bind(topic0)
363            .bind(topic1)
364            .bind(topic2)
365            .bind(topic3)
366            .bind(tx_index)
367            .bind(tx_hash)
368            .execute(pool)
369            .await?;
370
371        Ok(())
372    }
373
374    pub async fn emit(
375        &self,
376        queue: &redis::Client,
377        network: &NetworkKind,
378    ) -> Result<(), PropagateError> {
379        let mut con = queue.get_async_connection().await?;
380
381        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
382        con.lpush(channel, serde_json::to_string(self)?).await?;
383
384        Ok(())
385    }
386
387    pub async fn stream(
388        &self,
389        queue: &redis::Client,
390        network: &crate::networks::NetworkKind,
391    ) -> Result<(), PropagateError> {
392        let mut con = queue.get_async_connection().await?;
393
394        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
395        let serialized_data = serde_json::to_string(self)?;
396
397        let res: redis::RedisResult<String> = redis::cmd("XADD")
398            .arg(&key)
399            .arg("MAXLEN")
400            .arg("~")
401            .arg("100000")
402            .arg("*")
403            .arg("payload")
404            .arg(serialized_data)
405            .query_async(&mut con)
406            .await;
407
408        match res {
409            Ok(_) => Ok(()),
410            Err(e) => Err(PropagateError::StreamError(e.to_string())),
411        }
412    }
413
414    #[allow(clippy::type_complexity)]
415    pub(crate) fn db_repr(
416        &self,
417    ) -> (
418        &[u8],
419        Option<&[u8]>,
420        Option<i64>,
421        &[u8],
422        Option<i64>,
423        bool,
424        Option<&[u8]>,
425        Option<&[u8]>,
426        Option<&[u8]>,
427        Option<&[u8]>,
428        Option<i64>,
429        Option<&[u8]>,
430    ) {
431        let address = self.address.as_slice();
432        let block_hash = self.block_hash.as_ref().map(|v| v.as_slice());
433        let block_number = self.block_number.map(|v| v.to::<i64>());
434        let data = self.data.0.as_ref();
435        let log_index = self.log_index.map(|v| v.to::<i64>());
436        let removed = self.removed;
437        let topic0 = self.topics.first().map(|v| v.as_slice());
438        let topic1 = self.topics.get(1).map(|v| v.as_slice());
439        let topic2 = self.topics.get(2).map(|v| v.as_slice());
440        let topic3 = self.topics.get(3).map(|v| v.as_slice());
441        let tx_index = self.tx_index.map(|v| v.to::<i64>());
442        let tx_hash = self.tx_hash.as_ref().map(|v| v.as_slice());
443
444        (
445            address,
446            block_hash,
447            block_number,
448            data,
449            log_index,
450            removed,
451            topic0,
452            topic1,
453            topic2,
454            topic3,
455            tx_index,
456            tx_hash,
457        )
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464
465    #[test]
466    fn deserialize_core_block() {
467        let json = serde_json::json!({
468            "hash": "0x4debecd96c87bd9be70b2a428d1e2d537e7f3ce77e353a7f031b4b66fb4d12eb",
469            "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
470            "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
471            "miner": "0x0000000000000000000000000000000000000000",
472            "stateRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
473            "transactionsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
474            "receiptsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
475            "number": "0x1155a9",
476            "gasUsed": "0x0",
477            "gasLimit": "0x0",
478            "extraData": "0x",
479            "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
480            "timestamp": "0x65f06454",
481            "difficulty": "0x0",
482            "mixHash": null,
483            "nonce": null
484        });
485
486        assert!(serde_json::from_value::<CoreBlock>(json).is_ok());
487        assert!(serde_json::from_value::<CoreBlock>(serde_json::json!({})).is_err());
488    }
489
490    #[test]
491    fn deserialize_core_log() {
492        let json = serde_json::json!(
493            {
494                "address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
495                "blockHash": "0xfad3e899227b47062b71c90e61eeb056a43052be544bc006031b10df8abc92f4",
496                "blockNumber": "0x1286817",
497                "data": "0x0000000000000000000000000000000000000000000000000000000077359400",
498                "logIndex": "0x75",
499                "removed": false,
500                "topics": [
501                  "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
502                  "0x0000000000000000000000004d06a4779ae0ed965598a1ef2b86b95a41ad7e81",
503                  "0x00000000000000000000000011235534a66a33c366b84933d5202c841539d1c9"
504                ],
505                "transactionHash": "0x8acd636a4e0a0165bfbf003aa202a87b1a8e17e05183650ad39415861555aa6e",
506                "transactionIndex": "0x82"
507              }
508        );
509
510        assert!(serde_json::from_value::<CoreLog>(json).is_ok());
511        assert!(serde_json::from_value::<CoreLog>(serde_json::json!({})).is_err());
512    }
513}