eventify_primitives/events/
erc20.rs

1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC20;
7use crate::{
8    networks::{LogKind, ResourceKind},
9    traits::{Emit, Insert, Stream},
10    PropagateError,
11};
12
13impl Insert for ERC20::Transfer {
14    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15        let tx = tx_hash.as_ref().map(|v| v.as_slice());
16        let from = self.from.as_slice();
17        let to = self.to.as_slice();
18        let value = self.value.as_le_slice();
19
20        let sql = r#"INSERT INTO erc20_transfer (
21            tx_hash,
22            "from",
23            "to",
24            value )
25            VALUES (
26                $1, $2, $3, $4
27            ) ON CONFLICT DO NOTHING"#;
28
29        sqlx::query(sql)
30            .bind(tx)
31            .bind(from)
32            .bind(to)
33            .bind(value)
34            .execute(pool)
35            .await?;
36
37        Ok(())
38    }
39}
40
41impl Emit for ERC20::Transfer {
42    async fn emit(
43        &self,
44        queue: &redis::Client,
45        network: &crate::networks::NetworkKind,
46    ) -> Result<(), PropagateError> {
47        let mut con = queue.get_async_connection().await?;
48
49        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Transfer));
50        con.lpush(channel, serde_json::to_string(self)?).await?;
51
52        Ok(())
53    }
54}
55
56impl Stream for ERC20::Transfer {
57    //type MaxLen = u64;
58
59    async fn stream(
60        &self,
61        queue: &redis::Client,
62        network: &crate::networks::NetworkKind,
63    ) -> Result<(), PropagateError> {
64        let mut con = queue.get_async_connection().await?;
65
66        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Transfer));
67        let serialized_data = serde_json::to_string(self)?;
68
69        let res: redis::RedisResult<String> = redis::cmd("XADD")
70            .arg(&key)
71            .arg("MAXLEN")
72            .arg("~")
73            .arg("100000")
74            .arg("*")
75            .arg("payload")
76            .arg(serialized_data)
77            .query_async(&mut con)
78            .await;
79
80        match res {
81            Ok(_) => Ok(()),
82            Err(e) => Err(PropagateError::StreamError(e.to_string())),
83        }
84    }
85}
86
87impl Insert for ERC20::Approval {
88    async fn insert(
89        &self,
90        pool: &PgPool,
91        tx_hash: &Option<alloy_primitives::B256>,
92    ) -> Result<(), SqlError> {
93        let tx = tx_hash.as_ref().map(|v| v.as_slice());
94        let owner = self.owner.as_slice();
95        let spender = self.spender.as_slice();
96        let value = self.value.as_le_slice();
97
98        let sql = r#"INSERT INTO erc20_approval (
99            tx_hash,
100            "owner",
101            spender,
102            "value" )
103            VALUES (
104                $1, $2, $3, $4
105            ) ON CONFLICT DO NOTHING"#;
106
107        sqlx::query(sql)
108            .bind(tx)
109            .bind(owner)
110            .bind(spender)
111            .bind(value)
112            .execute(pool)
113            .await?;
114
115        Ok(())
116    }
117}
118
119impl Emit for ERC20::Approval {
120    async fn emit(
121        &self,
122        queue: &redis::Client,
123        network: &crate::networks::NetworkKind,
124    ) -> Result<(), PropagateError> {
125        let mut con = queue.get_async_connection().await?;
126
127        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Approval));
128        con.lpush(channel, serde_json::to_string(self)?).await?;
129
130        Ok(())
131    }
132}
133
134impl Stream for ERC20::Approval {
135    async fn stream(
136        &self,
137        queue: &redis::Client,
138        network: &crate::networks::NetworkKind,
139    ) -> Result<(), PropagateError> {
140        let mut con = queue.get_async_connection().await?;
141
142        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC20_Approval));
143        let serialized_data = serde_json::to_string(self)?;
144
145        let res: redis::RedisResult<String> = redis::cmd("XADD")
146            .arg(&key)
147            .arg("MAXLEN")
148            .arg("~")
149            .arg("100000")
150            .arg("*")
151            .arg("payload")
152            .arg(serialized_data)
153            .query_async(&mut con)
154            .await;
155
156        match res {
157            Ok(_) => Ok(()),
158            Err(e) => Err(PropagateError::StreamError(e.to_string())),
159        }
160    }
161}