eventify_primitives/networks/zksync/
log.rs1use alloy_primitives::U64;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{prelude::FromRow, Error as SqlError};
5use utoipa::ToSchema;
6
7use crate::{
8 networks::{core::CoreLog, LogKind, NetworkKind, ResourceKind},
9 traits::{Emit, Insert, Log, Stream},
10 PropagateError,
11};
12
13#[derive(
14 Clone,
15 Debug,
16 Default,
17 serde::Deserialize,
18 serde::Serialize,
19 PartialEq,
20 Eq,
21 Hash,
22 FromRow,
23 ToSchema,
24)]
25pub struct ZksyncLog {
26 #[serde(flatten)]
27 core: CoreLog,
28
29 #[serde(rename = "l1BatchNumber")]
30 pub l1_batch_number: Option<U64>,
31 #[serde(rename = "transactionLogIndex")]
32 pub tx_log_index: Option<U64>,
33 #[serde(rename = "logType")]
34 pub log_type: Option<String>,
35}
36
37impl Log for ZksyncLog {
38 fn core(&self) -> &CoreLog {
39 &self.core
40 }
41}
42
43impl Insert for ZksyncLog {
44 async fn insert(
45 &self,
46 pool: &sqlx::PgPool,
47 _: &Option<alloy_primitives::B256>,
48 ) -> Result<(), SqlError> {
49 let (
50 address,
51 block_hash,
52 block_number,
53 data,
54 log_index,
55 removed,
56 topic0,
57 topic1,
58 topic2,
59 topic3,
60 tx_index,
61 tx_hash,
62 ) = self.core().db_repr();
63
64 let l1_batch_number = self.l1_batch_number.map(|v| v.to::<i64>());
65 let tx_log_index = self.tx_log_index.map(|v| v.to::<i64>());
66
67 let query = r#"
68 INSERT INTO log (
69 network,
70 address,
71 block_hash,
72 block_number,
73 data,
74 log_index,
75 removed,
76 topic0,
77 topic1,
78 topic2,
79 topic3,
80 tx_index,
81 tx_hash,
82
83 l1_batch_number,
84 tx_log_index,
85 log_type
86 )
87 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
88 ON CONFLICT DO NOTHING
89 "#;
90
91 sqlx::query(query)
92 .bind(NetworkKind::Zksync)
93 .bind(address)
94 .bind(block_hash)
95 .bind(block_number)
96 .bind(data)
97 .bind(log_index)
98 .bind(removed)
99 .bind(topic0)
100 .bind(topic1)
101 .bind(topic2)
102 .bind(topic3)
103 .bind(tx_index)
104 .bind(tx_hash)
105 .bind(l1_batch_number)
106 .bind(tx_log_index)
107 .bind(&self.log_type)
108 .execute(pool)
109 .await?;
110
111 Ok(())
112 }
113}
114
115impl Emit for ZksyncLog {
116 async fn emit(
117 &self,
118 queue: &redis::Client,
119 network: &crate::networks::NetworkKind,
120 ) -> eyre::Result<(), PropagateError> {
121 let mut con = queue.get_async_connection().await?;
122
123 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
124 con.lpush(channel, serde_json::to_string(self)?).await?;
125
126 Ok(())
127 }
128}
129
130impl Stream for ZksyncLog {
131 async fn stream(
132 &self,
133 queue: &redis::Client,
134 network: &crate::networks::NetworkKind,
135 ) -> Result<(), PropagateError> {
136 let mut con = queue.get_async_connection().await?;
137
138 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::Raw));
139 let serialized_data = serde_json::to_string(self)?;
140
141 let res: redis::RedisResult<String> = redis::cmd("XADD")
142 .arg(&key)
143 .arg("MAXLEN")
144 .arg("~")
145 .arg("100000")
146 .arg("*")
147 .arg("payload")
148 .arg(serialized_data)
149 .query_async(&mut con)
150 .await;
151
152 match res {
153 Ok(_) => Ok(()),
154 Err(e) => Err(PropagateError::StreamError(e.to_string())),
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 #[test]
164 fn test_deserialize_zksync_log() {
165 let json = serde_json::json!({
166 "address": "0x000000000000000000000000000000000000800a",
167 "topics": [
168 "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
169 "0x0000000000000000000000000000000000000000000000000000000000008001",
170 "0x00000000000000000000000056ddd604011c5f8629bd7c2472e3504bd32c269b"
171 ],
172 "data": "0x00000000000000000000000000000000000000000000000000014c39ba59ba00",
173 "blockHash": "0x13f57e3e974d3e5bfe80e4588e6865e008d9c7a72ba55104924f1c9ee6999185",
174 "blockNumber": "0x11451a",
175 "l1BatchNumber": null,
176 "transactionHash": "0xea856ad27c508a5824dc0399fd0a412e88213c496ac4e50549bbdd62619a952e",
177 "transactionIndex": "0x0",
178 "logIndex": "0x5",
179 "transactionLogIndex": "0x5",
180 "logType": null,
181 "removed": false
182 });
183
184 assert!(serde_json::from_value::<ZksyncLog>(json).is_ok());
185 }
186
187 #[test]
188 fn test_deserialize_empty_zksync_log() {
189 let json = serde_json::json!({});
190
191 assert!(serde_json::from_value::<ZksyncLog>(json).is_err());
192 }
193}