eventify_primitives/events/
erc721.rs

1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC721;
7use crate::{
8    networks::{LogKind, ResourceKind},
9    traits::{Emit, Insert, Stream},
10    PropagateError,
11};
12
13impl Insert for ERC721::Transfer {
14    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15        let tx = tx_hash.as_ref().map(|v| v.as_slice());
16        let from = self.from.as_slice();
17        let to = self.to.as_slice();
18        let token_id = self.tokenId.as_le_slice();
19
20        let sql = r#"INSERT INTO erc721_transfer (
21            tx_hash,
22            "from",
23            "to",
24            token_id )
25            VALUES (
26                $1, $2, $3, $4
27            ) ON CONFLICT DO NOTHING"#;
28
29        sqlx::query(sql)
30            .bind(tx)
31            .bind(from)
32            .bind(to)
33            .bind(token_id)
34            .execute(pool)
35            .await?;
36
37        Ok(())
38    }
39}
40
41impl Emit for ERC721::Transfer {
42    async fn emit(
43        &self,
44        queue: &redis::Client,
45        network: &crate::networks::NetworkKind,
46    ) -> Result<(), PropagateError> {
47        let mut con = queue.get_async_connection().await?;
48
49        let channel = format!(
50            "{}:{}",
51            network,
52            ResourceKind::Log(LogKind::ERC721_Transfer)
53        );
54        con.lpush(channel, serde_json::to_string(self)?).await?;
55
56        Ok(())
57    }
58}
59
60impl Stream for ERC721::Transfer {
61    async fn stream(
62        &self,
63        queue: &redis::Client,
64        network: &crate::networks::NetworkKind,
65    ) -> Result<(), PropagateError> {
66        let mut con = queue.get_async_connection().await?;
67
68        let key = format!(
69            "{}:{}",
70            network,
71            ResourceKind::Log(LogKind::ERC721_Transfer)
72        );
73        let serialized_data = serde_json::to_string(self)?;
74
75        let res: redis::RedisResult<String> = redis::cmd("XADD")
76            .arg(&key)
77            .arg("MAXLEN")
78            .arg("~")
79            .arg("100000")
80            .arg("*")
81            .arg("payload")
82            .arg(serialized_data)
83            .query_async(&mut con)
84            .await;
85
86        match res {
87            Ok(_) => Ok(()),
88            Err(e) => Err(PropagateError::StreamError(e.to_string())),
89        }
90    }
91}
92
93impl Insert for ERC721::Approval {
94    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
95        let tx = tx_hash.as_ref().map(|v| v.as_slice());
96        let owner = self.owner.as_slice();
97        let approved = self.approved.as_slice();
98        let token_id = self.tokenId.as_le_slice();
99
100        let sql = r#"INSERT INTO erc721_approval (
101            tx_hash,
102            "owner",
103            approved,
104            token_id )
105            VALUES (
106                $1, $2, $3, $4
107            ) ON CONFLICT DO NOTHING"#;
108
109        sqlx::query(sql)
110            .bind(tx)
111            .bind(owner)
112            .bind(approved)
113            .bind(token_id)
114            .execute(pool)
115            .await?;
116
117        Ok(())
118    }
119}
120
121impl Emit for ERC721::Approval {
122    async fn emit(
123        &self,
124        queue: &redis::Client,
125        network: &crate::networks::NetworkKind,
126    ) -> Result<(), PropagateError> {
127        let mut con = queue.get_async_connection().await?;
128
129        let channel = format!(
130            "{}:{}",
131            network,
132            ResourceKind::Log(LogKind::ERC721_Approval)
133        );
134        con.lpush(channel, serde_json::to_string(self)?).await?;
135
136        Ok(())
137    }
138}
139
140impl Stream for ERC721::Approval {
141    async fn stream(
142        &self,
143        queue: &redis::Client,
144        network: &crate::networks::NetworkKind,
145    ) -> Result<(), PropagateError> {
146        let mut con = queue.get_async_connection().await?;
147
148        let key = format!(
149            "{}:{}",
150            network,
151            ResourceKind::Log(LogKind::ERC721_Approval)
152        );
153        let serialized_data = serde_json::to_string(self)?;
154
155        let res: redis::RedisResult<String> = redis::cmd("XADD")
156            .arg(&key)
157            .arg("MAXLEN")
158            .arg("~")
159            .arg("100000")
160            .arg("*")
161            .arg("payload")
162            .arg(serialized_data)
163            .query_async(&mut con)
164            .await;
165
166        match res {
167            Ok(_) => Ok(()),
168            Err(e) => Err(PropagateError::StreamError(e.to_string())),
169        }
170    }
171}
172
173impl Insert for ERC721::ApprovalForAll {
174    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
175        let tx = tx_hash.as_ref().map(|v| v.as_slice());
176        let owner = self.owner.as_slice();
177        let operator = self.operator.as_slice();
178        let approved = self.approved;
179
180        let sql = r#"INSERT INTO erc_approval_for_all (
181            tx_hash,
182            "owner",
183            operator,
184            approved )
185            VALUES (
186                $1, $2, $3, $4
187            ) ON CONFLICT DO NOTHING"#;
188
189        sqlx::query(sql)
190            .bind(tx)
191            .bind(owner)
192            .bind(operator)
193            .bind(approved)
194            .execute(pool)
195            .await?;
196
197        Ok(())
198    }
199}
200
201impl Emit for ERC721::ApprovalForAll {
202    async fn emit(
203        &self,
204        queue: &redis::Client,
205        network: &crate::networks::NetworkKind,
206    ) -> Result<(), PropagateError> {
207        let mut con = queue.get_async_connection().await?;
208
209        let channel = format!(
210            "{}:{}",
211            network,
212            ResourceKind::Log(LogKind::ERC721_ApprovalForAll)
213        );
214        con.lpush(channel, serde_json::to_string(self)?).await?;
215
216        Ok(())
217    }
218}
219
220impl Stream for ERC721::ApprovalForAll {
221    async fn stream(
222        &self,
223        queue: &redis::Client,
224        network: &crate::networks::NetworkKind,
225    ) -> Result<(), PropagateError> {
226        let mut con = queue.get_async_connection().await?;
227
228        let key = format!(
229            "{}:{}",
230            network,
231            ResourceKind::Log(LogKind::ERC721_ApprovalForAll)
232        );
233        let serialized_data = serde_json::to_string(self)?;
234
235        let res: redis::RedisResult<String> = redis::cmd("XADD")
236            .arg(&key)
237            .arg("MAXLEN")
238            .arg("~")
239            .arg("100000")
240            .arg("*")
241            .arg("payload")
242            .arg(serialized_data)
243            .query_async(&mut con)
244            .await;
245
246        match res {
247            Ok(_) => Ok(()),
248            Err(e) => Err(PropagateError::StreamError(e.to_string())),
249        }
250    }
251}