eventify_primitives/events/
erc20.rs1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC20;
7use crate::{
8 networks::{LogKind, ResourceKind},
9 traits::{Emit, Insert, Stream},
10 PropagateError,
11};
12
13impl Insert for ERC20::Transfer {
14 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15 let tx = tx_hash.as_ref().map(|v| v.as_slice());
16 let from = self.from.as_slice();
17 let to = self.to.as_slice();
18 let value = self.value.as_le_slice();
19
20 let sql = r#"INSERT INTO erc20_transfer (
21 tx_hash,
22 "from",
23 "to",
24 value )
25 VALUES (
26 $1, $2, $3, $4
27 ) ON CONFLICT DO NOTHING"#;
28
29 sqlx::query(sql)
30 .bind(tx)
31 .bind(from)
32 .bind(to)
33 .bind(value)
34 .execute(pool)
35 .await?;
36
37 Ok(())
38 }
39}
40
41impl Emit for ERC20::Transfer {
42 async fn emit(
43 &self,
44 queue: &redis::Client,
45 network: &crate::networks::NetworkKind,
46 ) -> Result<(), PropagateError> {
47 let mut con = queue.get_async_connection().await?;
48
49 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Transfer));
50 con.lpush(channel, serde_json::to_string(self)?).await?;
51
52 Ok(())
53 }
54}
55
56impl Stream for ERC20::Transfer {
57 async fn stream(
60 &self,
61 queue: &redis::Client,
62 network: &crate::networks::NetworkKind,
63 ) -> Result<(), PropagateError> {
64 let mut con = queue.get_async_connection().await?;
65
66 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Transfer));
67 let serialized_data = serde_json::to_string(self)?;
68
69 let res: redis::RedisResult<String> = redis::cmd("XADD")
70 .arg(&key)
71 .arg("MAXLEN")
72 .arg("~")
73 .arg("100000")
74 .arg("*")
75 .arg("payload")
76 .arg(serialized_data)
77 .query_async(&mut con)
78 .await;
79
80 match res {
81 Ok(_) => Ok(()),
82 Err(e) => Err(PropagateError::StreamError(e.to_string())),
83 }
84 }
85}
86
87impl Insert for ERC20::Approval {
88 async fn insert(
89 &self,
90 pool: &PgPool,
91 tx_hash: &Option<alloy_primitives::B256>,
92 ) -> Result<(), SqlError> {
93 let tx = tx_hash.as_ref().map(|v| v.as_slice());
94 let owner = self.owner.as_slice();
95 let spender = self.spender.as_slice();
96 let value = self.value.as_le_slice();
97
98 let sql = r#"INSERT INTO erc20_approval (
99 tx_hash,
100 "owner",
101 spender,
102 "value" )
103 VALUES (
104 $1, $2, $3, $4
105 ) ON CONFLICT DO NOTHING"#;
106
107 sqlx::query(sql)
108 .bind(tx)
109 .bind(owner)
110 .bind(spender)
111 .bind(value)
112 .execute(pool)
113 .await?;
114
115 Ok(())
116 }
117}
118
119impl Emit for ERC20::Approval {
120 async fn emit(
121 &self,
122 queue: &redis::Client,
123 network: &crate::networks::NetworkKind,
124 ) -> Result<(), PropagateError> {
125 let mut con = queue.get_async_connection().await?;
126
127 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Approval));
128 con.lpush(channel, serde_json::to_string(self)?).await?;
129
130 Ok(())
131 }
132}
133
134impl Stream for ERC20::Approval {
135 async fn stream(
136 &self,
137 queue: &redis::Client,
138 network: &crate::networks::NetworkKind,
139 ) -> Result<(), PropagateError> {
140 let mut con = queue.get_async_connection().await?;
141
142 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Approval));
143 let serialized_data = serde_json::to_string(self)?;
144
145 let res: redis::RedisResult<String> = redis::cmd("XADD")
146 .arg(&key)
147 .arg("MAXLEN")
148 .arg("~")
149 .arg("100000")
150 .arg("*")
151 .arg("payload")
152 .arg(serialized_data)
153 .query_async(&mut con)
154 .await;
155
156 match res {
157 Ok(_) => Ok(()),
158 Err(e) => Err(PropagateError::StreamError(e.to_string())),
159 }
160 }
161}