eventify_primitives/events/
erc777.rs1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC777;
7use crate::{
8 networks::{LogKind, ResourceKind},
9 traits::{Emit, Insert, Stream},
10 PropagateError,
11};
12
13impl Insert for ERC777::Sent {
14 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15 let tx = tx_hash.as_ref().map(|v| v.as_slice());
16 let operator = self.operator.as_slice();
17 let from = self.from.as_slice();
18 let to = self.to.as_slice();
19 let amount = self.amount.as_le_slice();
20 let data = self.data.as_slice();
21 let operator_data = self.operatorData.as_slice();
22
23 let sql = r#"INSERT INTO erc777_sent (
24 tx_hash,
25 operator,
26 "from",
27 "to",
28 amount,
29 "data",
30 operator_data )
31 VALUES (
32 $1, $2, $3, $4, $5, $6, $7
33 ) ON CONFLICT DO NOTHING"#;
34
35 sqlx::query(sql)
36 .bind(tx)
37 .bind(operator)
38 .bind(from)
39 .bind(to)
40 .bind(amount)
41 .bind(data)
42 .bind(operator_data)
43 .execute(pool)
44 .await?;
45
46 Ok(())
47 }
48}
49
50impl Emit for ERC777::Sent {
51 async fn emit(
52 &self,
53 queue: &redis::Client,
54 network: &crate::networks::NetworkKind,
55 ) -> Result<(), PropagateError> {
56 let mut con = queue.get_async_connection().await?;
57
58 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Sent));
59 con.lpush(channel, serde_json::to_string(self)?).await?;
60
61 Ok(())
62 }
63}
64
65impl Stream for ERC777::Sent {
66 async fn stream(
67 &self,
68 queue: &redis::Client,
69 network: &crate::networks::NetworkKind,
70 ) -> Result<(), PropagateError> {
71 let mut con = queue.get_async_connection().await?;
72
73 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Sent));
74 let serialized_data = serde_json::to_string(self)?;
75
76 let res: redis::RedisResult<String> = redis::cmd("XADD")
77 .arg(&key)
78 .arg("MAXLEN")
79 .arg("~")
80 .arg("100000")
81 .arg("*")
82 .arg("payload")
83 .arg(serialized_data)
84 .query_async(&mut con)
85 .await;
86
87 match res {
88 Ok(_) => Ok(()),
89 Err(e) => Err(PropagateError::StreamError(e.to_string())),
90 }
91 }
92}
93
94impl Insert for ERC777::Minted {
95 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
96 let tx = tx_hash.as_ref().map(|v| v.as_slice());
97 let operator = self.operator.as_slice();
98 let to = self.to.as_slice();
99 let amount = self.amount.as_le_slice();
100 let data = self.data.as_slice();
101 let operator_data = self.operatorData.as_slice();
102
103 let sql = r#"INSERT INTO erc777_minted (
104 tx_hash,
105 operator,
106 "to",
107 amount,
108 "data",
109 operator_data )
110 VALUES (
111 $1, $2, $3, $4, $5, $6
112 ) ON CONFLICT DO NOTHING"#;
113
114 sqlx::query(sql)
115 .bind(tx)
116 .bind(operator)
117 .bind(to)
118 .bind(amount)
119 .bind(data)
120 .bind(operator_data)
121 .execute(pool)
122 .await?;
123
124 Ok(())
125 }
126}
127
128impl Emit for ERC777::Minted {
129 async fn emit(
130 &self,
131 queue: &redis::Client,
132 network: &crate::networks::NetworkKind,
133 ) -> Result<(), PropagateError> {
134 let mut con = queue.get_async_connection().await?;
135
136 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Minted));
137 con.lpush(channel, serde_json::to_string(self)?).await?;
138
139 Ok(())
140 }
141}
142
143impl Stream for ERC777::Minted {
144 async fn stream(
145 &self,
146 queue: &redis::Client,
147 network: &crate::networks::NetworkKind,
148 ) -> Result<(), PropagateError> {
149 let mut con = queue.get_async_connection().await?;
150
151 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Minted));
152 let serialized_data = serde_json::to_string(self)?;
153
154 let res: redis::RedisResult<String> = redis::cmd("XADD")
155 .arg(&key)
156 .arg("MAXLEN")
157 .arg("~")
158 .arg("100000")
159 .arg("*")
160 .arg("payload")
161 .arg(serialized_data)
162 .query_async(&mut con)
163 .await;
164
165 match res {
166 Ok(_) => Ok(()),
167 Err(e) => Err(PropagateError::StreamError(e.to_string())),
168 }
169 }
170}
171
172impl Insert for ERC777::Burned {
173 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
174 let tx = tx_hash.as_ref().map(|v| v.as_slice());
175 let operator = self.operator.as_slice();
176 let from = self.from.as_slice();
177 let amount = self.amount.as_le_slice();
178 let data = self.data.as_slice();
179 let operator_data = self.operatorData.as_slice();
180
181 let sql = r#"INSERT INTO erc777_burned (
182 tx_hash,
183 operator,
184 "from",
185 amount,
186 "data",
187 operator_data )
188 VALUES (
189 $1, $2, $3, $4, $5, $6
190 ) ON CONFLICT DO NOTHING"#;
191
192 sqlx::query(sql)
193 .bind(tx)
194 .bind(operator)
195 .bind(from)
196 .bind(amount)
197 .bind(data)
198 .bind(operator_data)
199 .execute(pool)
200 .await?;
201
202 Ok(())
203 }
204}
205
206impl Emit for ERC777::Burned {
207 async fn emit(
208 &self,
209 queue: &redis::Client,
210 network: &crate::networks::NetworkKind,
211 ) -> Result<(), PropagateError> {
212 let mut con = queue.get_async_connection().await?;
213
214 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Burned));
215 con.lpush(channel, serde_json::to_string(self)?).await?;
216
217 Ok(())
218 }
219}
220
221impl Stream for ERC777::Burned {
222 async fn stream(
223 &self,
224 queue: &redis::Client,
225 network: &crate::networks::NetworkKind,
226 ) -> Result<(), PropagateError> {
227 let mut con = queue.get_async_connection().await?;
228
229 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Burned));
230 let serialized_data = serde_json::to_string(self)?;
231
232 let res: redis::RedisResult<String> = redis::cmd("XADD")
233 .arg(&key)
234 .arg("MAXLEN")
235 .arg("~")
236 .arg("100000")
237 .arg("*")
238 .arg("payload")
239 .arg(serialized_data)
240 .query_async(&mut con)
241 .await;
242
243 match res {
244 Ok(_) => Ok(()),
245 Err(e) => Err(PropagateError::StreamError(e.to_string())),
246 }
247 }
248}
249
250impl Insert for ERC777::AuthorizedOperator {
251 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
252 let tx = tx_hash.as_ref().map(|v| v.as_slice());
253 let operator = self.operator.as_slice();
254 let holder = self.holder.as_slice();
255
256 let sql = r#"INSERT INTO erc777_authorized_operator (
257 tx_hash,
258 operator,
259 holder )
260 VALUES (
261 $1, $2, $3
262 ) ON CONFLICT DO NOTHING"#;
263
264 sqlx::query(sql)
265 .bind(tx)
266 .bind(operator)
267 .bind(holder)
268 .execute(pool)
269 .await?;
270
271 Ok(())
272 }
273}
274
275impl Emit for ERC777::AuthorizedOperator {
276 async fn emit(
277 &self,
278 queue: &redis::Client,
279 network: &crate::networks::NetworkKind,
280 ) -> Result<(), PropagateError> {
281 let mut con = queue.get_async_connection().await?;
282
283 let channel = format!(
284 "{}:{}",
285 network,
286 ResourceKind::Log(LogKind::ERC777_AuthorizedOperator)
287 );
288 con.lpush(channel, serde_json::to_string(self)?).await?;
289
290 Ok(())
291 }
292}
293
294impl Stream for ERC777::AuthorizedOperator {
295 async fn stream(
296 &self,
297 queue: &redis::Client,
298 network: &crate::networks::NetworkKind,
299 ) -> Result<(), PropagateError> {
300 let mut con = queue.get_async_connection().await?;
301
302 let key = format!(
303 "{}:{}",
304 network,
305 ResourceKind::Log(LogKind::ERC777_AuthorizedOperator)
306 );
307 let serialized_data = serde_json::to_string(self)?;
308
309 let res: redis::RedisResult<String> = redis::cmd("XADD")
310 .arg(&key)
311 .arg("MAXLEN")
312 .arg("~")
313 .arg("100000")
314 .arg("*")
315 .arg("payload")
316 .arg(serialized_data)
317 .query_async(&mut con)
318 .await;
319
320 match res {
321 Ok(_) => Ok(()),
322 Err(e) => Err(PropagateError::StreamError(e.to_string())),
323 }
324 }
325}
326
327impl Insert for ERC777::RevokedOperator {
328 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
329 let tx = tx_hash.as_ref().map(|v| v.as_slice());
330 let operator = self.operator.as_slice();
331 let holder = self.holder.as_slice();
332
333 let sql = r#"INSERT INTO erc777_revoked_operator (
334 tx_hash,
335 operator,
336 holder )
337 VALUES (
338 $1, $2, $3
339 ) ON CONFLICT DO NOTHING"#;
340
341 sqlx::query(sql)
342 .bind(tx)
343 .bind(operator)
344 .bind(holder)
345 .execute(pool)
346 .await?;
347
348 Ok(())
349 }
350}
351
352impl Emit for ERC777::RevokedOperator {
353 async fn emit(
354 &self,
355 queue: &redis::Client,
356 network: &crate::networks::NetworkKind,
357 ) -> Result<(), PropagateError> {
358 let mut con = queue.get_async_connection().await?;
359
360 let channel = format!(
361 "{}:{}",
362 network,
363 ResourceKind::Log(LogKind::ERC777_RevokedOperator)
364 );
365 con.lpush(channel, serde_json::to_string(self)?).await?;
366
367 Ok(())
368 }
369}
370
371impl Stream for ERC777::RevokedOperator {
372 async fn stream(
373 &self,
374 queue: &redis::Client,
375 network: &crate::networks::NetworkKind,
376 ) -> Result<(), PropagateError> {
377 let mut con = queue.get_async_connection().await?;
378
379 let key = format!(
380 "{}:{}",
381 network,
382 ResourceKind::Log(LogKind::ERC777_RevokedOperator)
383 );
384 let serialized_data = serde_json::to_string(self)?;
385
386 let res: redis::RedisResult<String> = redis::cmd("XADD")
387 .arg(&key)
388 .arg("MAXLEN")
389 .arg("~")
390 .arg("100000")
391 .arg("*")
392 .arg("payload")
393 .arg(serialized_data)
394 .query_async(&mut con)
395 .await;
396
397 match res {
398 Ok(_) => Ok(()),
399 Err(e) => Err(PropagateError::StreamError(e.to_string())),
400 }
401 }
402}