eventify_primitives/events/
erc777.rs

1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC777;
7use crate::{
8    networks::{LogKind, ResourceKind},
9    traits::{Emit, Insert, Stream},
10    PropagateError,
11};
12
13impl Insert for ERC777::Sent {
14    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15        let tx = tx_hash.as_ref().map(|v| v.as_slice());
16        let operator = self.operator.as_slice();
17        let from = self.from.as_slice();
18        let to = self.to.as_slice();
19        let amount = self.amount.as_le_slice();
20        let data = self.data.as_slice();
21        let operator_data = self.operatorData.as_slice();
22
23        let sql = r#"INSERT INTO erc777_sent (
24            tx_hash,
25            operator,
26            "from",
27            "to",
28            amount,
29            "data",
30            operator_data )
31            VALUES (
32                $1, $2, $3, $4, $5, $6, $7
33            ) ON CONFLICT DO NOTHING"#;
34
35        sqlx::query(sql)
36            .bind(tx)
37            .bind(operator)
38            .bind(from)
39            .bind(to)
40            .bind(amount)
41            .bind(data)
42            .bind(operator_data)
43            .execute(pool)
44            .await?;
45
46        Ok(())
47    }
48}
49
50impl Emit for ERC777::Sent {
51    async fn emit(
52        &self,
53        queue: &redis::Client,
54        network: &crate::networks::NetworkKind,
55    ) -> Result<(), PropagateError> {
56        let mut con = queue.get_async_connection().await?;
57
58        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Sent));
59        con.lpush(channel, serde_json::to_string(self)?).await?;
60
61        Ok(())
62    }
63}
64
65impl Stream for ERC777::Sent {
66    async fn stream(
67        &self,
68        queue: &redis::Client,
69        network: &crate::networks::NetworkKind,
70    ) -> Result<(), PropagateError> {
71        let mut con = queue.get_async_connection().await?;
72
73        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Sent));
74        let serialized_data = serde_json::to_string(self)?;
75
76        let res: redis::RedisResult<String> = redis::cmd("XADD")
77            .arg(&key)
78            .arg("MAXLEN")
79            .arg("~")
80            .arg("100000")
81            .arg("*")
82            .arg("payload")
83            .arg(serialized_data)
84            .query_async(&mut con)
85            .await;
86
87        match res {
88            Ok(_) => Ok(()),
89            Err(e) => Err(PropagateError::StreamError(e.to_string())),
90        }
91    }
92}
93
94impl Insert for ERC777::Minted {
95    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
96        let tx = tx_hash.as_ref().map(|v| v.as_slice());
97        let operator = self.operator.as_slice();
98        let to = self.to.as_slice();
99        let amount = self.amount.as_le_slice();
100        let data = self.data.as_slice();
101        let operator_data = self.operatorData.as_slice();
102
103        let sql = r#"INSERT INTO erc777_minted (
104            tx_hash,
105            operator,
106            "to",
107            amount,
108            "data",
109            operator_data )
110            VALUES (
111                $1, $2, $3, $4, $5, $6
112            ) ON CONFLICT DO NOTHING"#;
113
114        sqlx::query(sql)
115            .bind(tx)
116            .bind(operator)
117            .bind(to)
118            .bind(amount)
119            .bind(data)
120            .bind(operator_data)
121            .execute(pool)
122            .await?;
123
124        Ok(())
125    }
126}
127
128impl Emit for ERC777::Minted {
129    async fn emit(
130        &self,
131        queue: &redis::Client,
132        network: &crate::networks::NetworkKind,
133    ) -> Result<(), PropagateError> {
134        let mut con = queue.get_async_connection().await?;
135
136        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Minted));
137        con.lpush(channel, serde_json::to_string(self)?).await?;
138
139        Ok(())
140    }
141}
142
143impl Stream for ERC777::Minted {
144    async fn stream(
145        &self,
146        queue: &redis::Client,
147        network: &crate::networks::NetworkKind,
148    ) -> Result<(), PropagateError> {
149        let mut con = queue.get_async_connection().await?;
150
151        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Minted));
152        let serialized_data = serde_json::to_string(self)?;
153
154        let res: redis::RedisResult<String> = redis::cmd("XADD")
155            .arg(&key)
156            .arg("MAXLEN")
157            .arg("~")
158            .arg("100000")
159            .arg("*")
160            .arg("payload")
161            .arg(serialized_data)
162            .query_async(&mut con)
163            .await;
164
165        match res {
166            Ok(_) => Ok(()),
167            Err(e) => Err(PropagateError::StreamError(e.to_string())),
168        }
169    }
170}
171
172impl Insert for ERC777::Burned {
173    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
174        let tx = tx_hash.as_ref().map(|v| v.as_slice());
175        let operator = self.operator.as_slice();
176        let from = self.from.as_slice();
177        let amount = self.amount.as_le_slice();
178        let data = self.data.as_slice();
179        let operator_data = self.operatorData.as_slice();
180
181        let sql = r#"INSERT INTO erc777_burned (
182            tx_hash,
183            operator,
184            "from",
185            amount,
186            "data",
187            operator_data )
188            VALUES (
189                $1, $2, $3, $4, $5, $6
190            ) ON CONFLICT DO NOTHING"#;
191
192        sqlx::query(sql)
193            .bind(tx)
194            .bind(operator)
195            .bind(from)
196            .bind(amount)
197            .bind(data)
198            .bind(operator_data)
199            .execute(pool)
200            .await?;
201
202        Ok(())
203    }
204}
205
206impl Emit for ERC777::Burned {
207    async fn emit(
208        &self,
209        queue: &redis::Client,
210        network: &crate::networks::NetworkKind,
211    ) -> Result<(), PropagateError> {
212        let mut con = queue.get_async_connection().await?;
213
214        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Burned));
215        con.lpush(channel, serde_json::to_string(self)?).await?;
216
217        Ok(())
218    }
219}
220
221impl Stream for ERC777::Burned {
222    async fn stream(
223        &self,
224        queue: &redis::Client,
225        network: &crate::networks::NetworkKind,
226    ) -> Result<(), PropagateError> {
227        let mut con = queue.get_async_connection().await?;
228
229        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC777_Burned));
230        let serialized_data = serde_json::to_string(self)?;
231
232        let res: redis::RedisResult<String> = redis::cmd("XADD")
233            .arg(&key)
234            .arg("MAXLEN")
235            .arg("~")
236            .arg("100000")
237            .arg("*")
238            .arg("payload")
239            .arg(serialized_data)
240            .query_async(&mut con)
241            .await;
242
243        match res {
244            Ok(_) => Ok(()),
245            Err(e) => Err(PropagateError::StreamError(e.to_string())),
246        }
247    }
248}
249
250impl Insert for ERC777::AuthorizedOperator {
251    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
252        let tx = tx_hash.as_ref().map(|v| v.as_slice());
253        let operator = self.operator.as_slice();
254        let holder = self.holder.as_slice();
255
256        let sql = r#"INSERT INTO erc777_authorized_operator (
257            tx_hash,
258            operator,
259            holder )
260            VALUES (
261                $1, $2, $3
262            ) ON CONFLICT DO NOTHING"#;
263
264        sqlx::query(sql)
265            .bind(tx)
266            .bind(operator)
267            .bind(holder)
268            .execute(pool)
269            .await?;
270
271        Ok(())
272    }
273}
274
275impl Emit for ERC777::AuthorizedOperator {
276    async fn emit(
277        &self,
278        queue: &redis::Client,
279        network: &crate::networks::NetworkKind,
280    ) -> Result<(), PropagateError> {
281        let mut con = queue.get_async_connection().await?;
282
283        let channel = format!(
284            "{}:{}",
285            network,
286            ResourceKind::Log(LogKind::ERC777_AuthorizedOperator)
287        );
288        con.lpush(channel, serde_json::to_string(self)?).await?;
289
290        Ok(())
291    }
292}
293
294impl Stream for ERC777::AuthorizedOperator {
295    async fn stream(
296        &self,
297        queue: &redis::Client,
298        network: &crate::networks::NetworkKind,
299    ) -> Result<(), PropagateError> {
300        let mut con = queue.get_async_connection().await?;
301
302        let key = format!(
303            "{}:{}",
304            network,
305            ResourceKind::Log(LogKind::ERC777_AuthorizedOperator)
306        );
307        let serialized_data = serde_json::to_string(self)?;
308
309        let res: redis::RedisResult<String> = redis::cmd("XADD")
310            .arg(&key)
311            .arg("MAXLEN")
312            .arg("~")
313            .arg("100000")
314            .arg("*")
315            .arg("payload")
316            .arg(serialized_data)
317            .query_async(&mut con)
318            .await;
319
320        match res {
321            Ok(_) => Ok(()),
322            Err(e) => Err(PropagateError::StreamError(e.to_string())),
323        }
324    }
325}
326
327impl Insert for ERC777::RevokedOperator {
328    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
329        let tx = tx_hash.as_ref().map(|v| v.as_slice());
330        let operator = self.operator.as_slice();
331        let holder = self.holder.as_slice();
332
333        let sql = r#"INSERT INTO erc777_revoked_operator (
334            tx_hash,
335            operator,
336            holder )
337            VALUES (
338                $1, $2, $3
339            ) ON CONFLICT DO NOTHING"#;
340
341        sqlx::query(sql)
342            .bind(tx)
343            .bind(operator)
344            .bind(holder)
345            .execute(pool)
346            .await?;
347
348        Ok(())
349    }
350}
351
352impl Emit for ERC777::RevokedOperator {
353    async fn emit(
354        &self,
355        queue: &redis::Client,
356        network: &crate::networks::NetworkKind,
357    ) -> Result<(), PropagateError> {
358        let mut con = queue.get_async_connection().await?;
359
360        let channel = format!(
361            "{}:{}",
362            network,
363            ResourceKind::Log(LogKind::ERC777_RevokedOperator)
364        );
365        con.lpush(channel, serde_json::to_string(self)?).await?;
366
367        Ok(())
368    }
369}
370
371impl Stream for ERC777::RevokedOperator {
372    async fn stream(
373        &self,
374        queue: &redis::Client,
375        network: &crate::networks::NetworkKind,
376    ) -> Result<(), PropagateError> {
377        let mut con = queue.get_async_connection().await?;
378
379        let key = format!(
380            "{}:{}",
381            network,
382            ResourceKind::Log(LogKind::ERC777_RevokedOperator)
383        );
384        let serialized_data = serde_json::to_string(self)?;
385
386        let res: redis::RedisResult<String> = redis::cmd("XADD")
387            .arg(&key)
388            .arg("MAXLEN")
389            .arg("~")
390            .arg("100000")
391            .arg("*")
392            .arg("payload")
393            .arg(serialized_data)
394            .query_async(&mut con)
395            .await;
396
397        match res {
398            Ok(_) => Ok(()),
399            Err(e) => Err(PropagateError::StreamError(e.to_string())),
400        }
401    }
402}