eventify_primitives/events/
erc721.rs1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC721;
7use crate::{
8 networks::{LogKind, ResourceKind},
9 traits::{Emit, Insert, Stream},
10 PropagateError,
11};
12
13impl Insert for ERC721::Transfer {
14 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15 let tx = tx_hash.as_ref().map(|v| v.as_slice());
16 let from = self.from.as_slice();
17 let to = self.to.as_slice();
18 let token_id = self.tokenId.as_le_slice();
19
20 let sql = r#"INSERT INTO erc721_transfer (
21 tx_hash,
22 "from",
23 "to",
24 token_id )
25 VALUES (
26 $1, $2, $3, $4
27 ) ON CONFLICT DO NOTHING"#;
28
29 sqlx::query(sql)
30 .bind(tx)
31 .bind(from)
32 .bind(to)
33 .bind(token_id)
34 .execute(pool)
35 .await?;
36
37 Ok(())
38 }
39}
40
41impl Emit for ERC721::Transfer {
42 async fn emit(
43 &self,
44 queue: &redis::Client,
45 network: &crate::networks::NetworkKind,
46 ) -> Result<(), PropagateError> {
47 let mut con = queue.get_async_connection().await?;
48
49 let channel = format!(
50 "{}:{}",
51 network,
52 ResourceKind::Log(LogKind::ERC721_Transfer)
53 );
54 con.lpush(channel, serde_json::to_string(self)?).await?;
55
56 Ok(())
57 }
58}
59
60impl Stream for ERC721::Transfer {
61 async fn stream(
62 &self,
63 queue: &redis::Client,
64 network: &crate::networks::NetworkKind,
65 ) -> Result<(), PropagateError> {
66 let mut con = queue.get_async_connection().await?;
67
68 let key = format!(
69 "{}:{}",
70 network,
71 ResourceKind::Log(LogKind::ERC721_Transfer)
72 );
73 let serialized_data = serde_json::to_string(self)?;
74
75 let res: redis::RedisResult<String> = redis::cmd("XADD")
76 .arg(&key)
77 .arg("MAXLEN")
78 .arg("~")
79 .arg("100000")
80 .arg("*")
81 .arg("payload")
82 .arg(serialized_data)
83 .query_async(&mut con)
84 .await;
85
86 match res {
87 Ok(_) => Ok(()),
88 Err(e) => Err(PropagateError::StreamError(e.to_string())),
89 }
90 }
91}
92
93impl Insert for ERC721::Approval {
94 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
95 let tx = tx_hash.as_ref().map(|v| v.as_slice());
96 let owner = self.owner.as_slice();
97 let approved = self.approved.as_slice();
98 let token_id = self.tokenId.as_le_slice();
99
100 let sql = r#"INSERT INTO erc721_approval (
101 tx_hash,
102 "owner",
103 approved,
104 token_id )
105 VALUES (
106 $1, $2, $3, $4
107 ) ON CONFLICT DO NOTHING"#;
108
109 sqlx::query(sql)
110 .bind(tx)
111 .bind(owner)
112 .bind(approved)
113 .bind(token_id)
114 .execute(pool)
115 .await?;
116
117 Ok(())
118 }
119}
120
121impl Emit for ERC721::Approval {
122 async fn emit(
123 &self,
124 queue: &redis::Client,
125 network: &crate::networks::NetworkKind,
126 ) -> Result<(), PropagateError> {
127 let mut con = queue.get_async_connection().await?;
128
129 let channel = format!(
130 "{}:{}",
131 network,
132 ResourceKind::Log(LogKind::ERC721_Approval)
133 );
134 con.lpush(channel, serde_json::to_string(self)?).await?;
135
136 Ok(())
137 }
138}
139
140impl Stream for ERC721::Approval {
141 async fn stream(
142 &self,
143 queue: &redis::Client,
144 network: &crate::networks::NetworkKind,
145 ) -> Result<(), PropagateError> {
146 let mut con = queue.get_async_connection().await?;
147
148 let key = format!(
149 "{}:{}",
150 network,
151 ResourceKind::Log(LogKind::ERC721_Approval)
152 );
153 let serialized_data = serde_json::to_string(self)?;
154
155 let res: redis::RedisResult<String> = redis::cmd("XADD")
156 .arg(&key)
157 .arg("MAXLEN")
158 .arg("~")
159 .arg("100000")
160 .arg("*")
161 .arg("payload")
162 .arg(serialized_data)
163 .query_async(&mut con)
164 .await;
165
166 match res {
167 Ok(_) => Ok(()),
168 Err(e) => Err(PropagateError::StreamError(e.to_string())),
169 }
170 }
171}
172
173impl Insert for ERC721::ApprovalForAll {
174 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
175 let tx = tx_hash.as_ref().map(|v| v.as_slice());
176 let owner = self.owner.as_slice();
177 let operator = self.operator.as_slice();
178 let approved = self.approved;
179
180 let sql = r#"INSERT INTO erc_approval_for_all (
181 tx_hash,
182 "owner",
183 operator,
184 approved )
185 VALUES (
186 $1, $2, $3, $4
187 ) ON CONFLICT DO NOTHING"#;
188
189 sqlx::query(sql)
190 .bind(tx)
191 .bind(owner)
192 .bind(operator)
193 .bind(approved)
194 .execute(pool)
195 .await?;
196
197 Ok(())
198 }
199}
200
201impl Emit for ERC721::ApprovalForAll {
202 async fn emit(
203 &self,
204 queue: &redis::Client,
205 network: &crate::networks::NetworkKind,
206 ) -> Result<(), PropagateError> {
207 let mut con = queue.get_async_connection().await?;
208
209 let channel = format!(
210 "{}:{}",
211 network,
212 ResourceKind::Log(LogKind::ERC721_ApprovalForAll)
213 );
214 con.lpush(channel, serde_json::to_string(self)?).await?;
215
216 Ok(())
217 }
218}
219
220impl Stream for ERC721::ApprovalForAll {
221 async fn stream(
222 &self,
223 queue: &redis::Client,
224 network: &crate::networks::NetworkKind,
225 ) -> Result<(), PropagateError> {
226 let mut con = queue.get_async_connection().await?;
227
228 let key = format!(
229 "{}:{}",
230 network,
231 ResourceKind::Log(LogKind::ERC721_ApprovalForAll)
232 );
233 let serialized_data = serde_json::to_string(self)?;
234
235 let res: redis::RedisResult<String> = redis::cmd("XADD")
236 .arg(&key)
237 .arg("MAXLEN")
238 .arg("~")
239 .arg("100000")
240 .arg("*")
241 .arg("payload")
242 .arg(serialized_data)
243 .query_async(&mut con)
244 .await;
245
246 match res {
247 Ok(_) => Ok(()),
248 Err(e) => Err(PropagateError::StreamError(e.to_string())),
249 }
250 }
251}