eventify_primitives/networks/zksync/
log.rs

1use alloy_primitives::U64;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{prelude::FromRow, Error as SqlError};
5use utoipa::ToSchema;
6
7use crate::{
8    networks::{core::CoreLog, LogKind, NetworkKind, ResourceKind},
9    traits::{Emit, Insert, Log, Stream},
10    PropagateError,
11};
12
13#[derive(
14    Clone,
15    Debug,
16    Default,
17    serde::Deserialize,
18    serde::Serialize,
19    PartialEq,
20    Eq,
21    Hash,
22    FromRow,
23    ToSchema,
24)]
25pub struct ZksyncLog {
26    #[serde(flatten)]
27    core: CoreLog,
28
29    #[serde(rename = "l1BatchNumber")]
30    pub l1_batch_number: Option<U64>,
31    #[serde(rename = "transactionLogIndex")]
32    pub tx_log_index: Option<U64>,
33    #[serde(rename = "logType")]
34    pub log_type: Option<String>,
35}
36
37impl Log for ZksyncLog {
38    fn core(&self) -> &CoreLog {
39        &self.core
40    }
41}
42
43impl Insert for ZksyncLog {
44    async fn insert(
45        &self,
46        pool: &sqlx::PgPool,
47        _: &Option<alloy_primitives::B256>,
48    ) -> Result<(), SqlError> {
49        let (
50            address,
51            block_hash,
52            block_number,
53            data,
54            log_index,
55            removed,
56            topic0,
57            topic1,
58            topic2,
59            topic3,
60            tx_index,
61            tx_hash,
62        ) = self.core().db_repr();
63
64        let l1_batch_number = self.l1_batch_number.map(|v| v.to::<i64>());
65        let tx_log_index = self.tx_log_index.map(|v| v.to::<i64>());
66
67        let query = r#"
68            INSERT INTO log (
69                network,
70                address,
71                block_hash,
72                block_number,
73                data,
74                log_index,
75                removed,
76                topic0,
77                topic1,
78                topic2,
79                topic3,
80                tx_index,
81                tx_hash,
82
83                l1_batch_number,
84                tx_log_index,
85                log_type
86            )
87            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
88            ON CONFLICT DO NOTHING
89            "#;
90
91        sqlx::query(query)
92            .bind(NetworkKind::Zksync)
93            .bind(address)
94            .bind(block_hash)
95            .bind(block_number)
96            .bind(data)
97            .bind(log_index)
98            .bind(removed)
99            .bind(topic0)
100            .bind(topic1)
101            .bind(topic2)
102            .bind(topic3)
103            .bind(tx_index)
104            .bind(tx_hash)
105            .bind(l1_batch_number)
106            .bind(tx_log_index)
107            .bind(&self.log_type)
108            .execute(pool)
109            .await?;
110
111        Ok(())
112    }
113}
114
115impl Emit for ZksyncLog {
116    async fn emit(
117        &self,
118        queue: &redis::Client,
119        network: &crate::networks::NetworkKind,
120    ) -> eyre::Result<(), PropagateError> {
121        let mut con = queue.get_async_connection().await?;
122
123        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
124        con.lpush(channel, serde_json::to_string(self)?).await?;
125
126        Ok(())
127    }
128}
129
130impl Stream for ZksyncLog {
131    async fn stream(
132        &self,
133        queue: &redis::Client,
134        network: &crate::networks::NetworkKind,
135    ) -> Result<(), PropagateError> {
136        let mut con = queue.get_async_connection().await?;
137
138        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
139        let serialized_data = serde_json::to_string(self)?;
140
141        let res: redis::RedisResult<String> = redis::cmd("XADD")
142            .arg(&key)
143            .arg("MAXLEN")
144            .arg("~")
145            .arg("100000")
146            .arg("*")
147            .arg("payload")
148            .arg(serialized_data)
149            .query_async(&mut con)
150            .await;
151
152        match res {
153            Ok(_) => Ok(()),
154            Err(e) => Err(PropagateError::StreamError(e.to_string())),
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    #[test]
164    fn test_deserialize_zksync_log() {
165        let json = serde_json::json!({
166          "address": "0x000000000000000000000000000000000000800a",
167          "topics": [
168            "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
169            "0x0000000000000000000000000000000000000000000000000000000000008001",
170            "0x00000000000000000000000056ddd604011c5f8629bd7c2472e3504bd32c269b"
171          ],
172          "data": "0x00000000000000000000000000000000000000000000000000014c39ba59ba00",
173          "blockHash": "0x13f57e3e974d3e5bfe80e4588e6865e008d9c7a72ba55104924f1c9ee6999185",
174          "blockNumber": "0x11451a",
175          "l1BatchNumber": null,
176          "transactionHash": "0xea856ad27c508a5824dc0399fd0a412e88213c496ac4e50549bbdd62619a952e",
177          "transactionIndex": "0x0",
178          "logIndex": "0x5",
179          "transactionLogIndex": "0x5",
180          "logType": null,
181          "removed": false
182        });
183
184        assert!(serde_json::from_value::<ZksyncLog>(json).is_ok());
185    }
186
187    #[test]
188    fn test_deserialize_empty_zksync_log() {
189        let json = serde_json::json!({});
190
191        assert!(serde_json::from_value::<ZksyncLog>(json).is_err());
192    }
193}