eventify_primitives/events/
erc1155.rs

1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC1155;
7use crate::{
8    networks::{LogKind, ResourceKind},
9    traits::{Emit, Insert, Stream},
10    PropagateError,
11};
12
13impl Insert for ERC1155::TransferSingle {
14    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15        let tx = tx_hash.as_ref().map(|v| v.as_slice());
16        let operator = self.operator.as_slice();
17        let from = self.from.as_slice();
18        let to = self.to.as_slice();
19        let id = self.id.as_le_slice();
20        let value = self.value.as_le_slice();
21
22        let sql = r#"INSERT INTO erc1155_transfer_single (
23            tx_hash,
24            operator,
25            "from",
26            "to",
27            id,
28            value )
29            VALUES (
30                $1, $2, $3, $4, $5, $6
31            ) ON CONFLICT DO NOTHING"#;
32
33        sqlx::query(sql)
34            .bind(tx)
35            .bind(operator)
36            .bind(from)
37            .bind(to)
38            .bind(id)
39            .bind(value)
40            .execute(pool)
41            .await?;
42
43        Ok(())
44    }
45}
46
47impl Emit for ERC1155::TransferSingle {
48    async fn emit(
49        &self,
50        queue: &redis::Client,
51        network: &crate::networks::NetworkKind,
52    ) -> Result<(), PropagateError> {
53        let mut con = queue.get_async_connection().await?;
54
55        let channel = format!(
56            "{}:{}",
57            network,
58            ResourceKind::Log(LogKind::ERC1155_TransferSingle)
59        );
60        con.lpush(channel, serde_json::to_string(self)?).await?;
61
62        Ok(())
63    }
64}
65
66impl Stream for ERC1155::TransferSingle {
67    async fn stream(
68        &self,
69        queue: &redis::Client,
70        network: &crate::networks::NetworkKind,
71    ) -> Result<(), PropagateError> {
72        let mut con = queue.get_async_connection().await?;
73
74        let key = format!(
75            "{}:{}",
76            network,
77            ResourceKind::Log(LogKind::ERC1155_TransferSingle)
78        );
79        let serialized_data = serde_json::to_string(self)?;
80
81        let res: redis::RedisResult<String> = redis::cmd("XADD")
82            .arg(&key)
83            .arg("MAXLEN")
84            .arg("~")
85            .arg("100000")
86            .arg("*")
87            .arg("payload")
88            .arg(serialized_data)
89            .query_async(&mut con)
90            .await;
91
92        match res {
93            Ok(_) => Ok(()),
94            Err(e) => Err(PropagateError::StreamError(e.to_string())),
95        }
96    }
97}
98
99impl Insert for ERC1155::TransferBatch {
100    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
101        let tx = tx_hash.as_ref().map(|v| v.as_slice());
102        let operator = self.operator.as_slice();
103        let from = self.from.as_slice();
104        let to = self.to.as_slice();
105        let ids = self.ids.iter().map(|v| v.as_le_slice()).collect::<Vec<_>>();
106        let values = self
107            .values
108            .iter()
109            .map(|v| v.as_le_slice())
110            .collect::<Vec<_>>();
111
112        let sql = r#"INSERT INTO erc1155_transfer_batch (
113            tx_hash,
114            operator,
115            "from",
116            "to",
117            ids,
118            values )
119            VALUES (
120                $1, $2, $3, $4, $5, $6
121            ) ON CONFLICT DO NOTHING"#;
122
123        sqlx::query(sql)
124            .bind(tx)
125            .bind(operator)
126            .bind(from)
127            .bind(to)
128            .bind(ids)
129            .bind(values)
130            .execute(pool)
131            .await?;
132
133        Ok(())
134    }
135}
136
137impl Emit for ERC1155::TransferBatch {
138    async fn emit(
139        &self,
140        queue: &redis::Client,
141        network: &crate::networks::NetworkKind,
142    ) -> Result<(), PropagateError> {
143        let mut con = queue.get_async_connection().await?;
144
145        let channel = format!(
146            "{}:{}",
147            network,
148            ResourceKind::Log(LogKind::ERC1155_TransferBatch)
149        );
150        con.lpush(channel, serde_json::to_string(self)?).await?;
151
152        Ok(())
153    }
154}
155
156impl Stream for ERC1155::TransferBatch {
157    async fn stream(
158        &self,
159        queue: &redis::Client,
160        network: &crate::networks::NetworkKind,
161    ) -> Result<(), PropagateError> {
162        let mut con = queue.get_async_connection().await?;
163
164        let key = format!(
165            "{}:{}",
166            network,
167            ResourceKind::Log(LogKind::ERC1155_TransferBatch)
168        );
169        let serialized_data = serde_json::to_string(self)?;
170
171        let res: redis::RedisResult<String> = redis::cmd("XADD")
172            .arg(&key)
173            .arg("MAXLEN")
174            .arg("~")
175            .arg("100000")
176            .arg("*")
177            .arg("payload")
178            .arg(serialized_data)
179            .query_async(&mut con)
180            .await;
181
182        match res {
183            Ok(_) => Ok(()),
184            Err(e) => Err(PropagateError::StreamError(e.to_string())),
185        }
186    }
187}
188
189impl Insert for ERC1155::URI {
190    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
191        let tx = tx_hash.as_ref().map(|v| v.as_slice());
192        let value = self.value.as_str();
193        let id = self.id.as_le_slice();
194
195        let sql = r#"INSERT INTO erc1155_uri (
196            tx_hash,
197            "value",
198            id )
199            VALUES (
200                $1, $2, $3
201            ) ON CONFLICT DO NOTHING"#;
202
203        sqlx::query(sql)
204            .bind(tx)
205            .bind(value)
206            .bind(id)
207            .execute(pool)
208            .await?;
209
210        Ok(())
211    }
212}
213
214impl Emit for ERC1155::URI {
215    async fn emit(
216        &self,
217        queue: &redis::Client,
218        network: &crate::networks::NetworkKind,
219    ) -> Result<(), PropagateError> {
220        let mut con = queue.get_async_connection().await?;
221
222        let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC1155_URI));
223        con.lpush(channel, serde_json::to_string(self)?).await?;
224
225        Ok(())
226    }
227}
228
229impl Stream for ERC1155::URI {
230    async fn stream(
231        &self,
232        queue: &redis::Client,
233        network: &crate::networks::NetworkKind,
234    ) -> Result<(), PropagateError> {
235        let mut con = queue.get_async_connection().await?;
236
237        let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC1155_URI));
238        let serialized_data = serde_json::to_string(self)?;
239
240        let res: redis::RedisResult<String> = redis::cmd("XADD")
241            .arg(&key)
242            .arg("MAXLEN")
243            .arg("~")
244            .arg("100000")
245            .arg("*")
246            .arg("payload")
247            .arg(serialized_data)
248            .query_async(&mut con)
249            .await;
250
251        match res {
252            Ok(_) => Ok(()),
253            Err(e) => Err(PropagateError::StreamError(e.to_string())),
254        }
255    }
256}