eventify_primitives/events/
erc1155.rs1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC1155;
7use crate::{
8 networks::{LogKind, ResourceKind},
9 traits::{Emit, Insert, Stream},
10 PropagateError,
11};
12
13impl Insert for ERC1155::TransferSingle {
14 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15 let tx = tx_hash.as_ref().map(|v| v.as_slice());
16 let operator = self.operator.as_slice();
17 let from = self.from.as_slice();
18 let to = self.to.as_slice();
19 let id = self.id.as_le_slice();
20 let value = self.value.as_le_slice();
21
22 let sql = r#"INSERT INTO erc1155_transfer_single (
23 tx_hash,
24 operator,
25 "from",
26 "to",
27 id,
28 value )
29 VALUES (
30 $1, $2, $3, $4, $5, $6
31 ) ON CONFLICT DO NOTHING"#;
32
33 sqlx::query(sql)
34 .bind(tx)
35 .bind(operator)
36 .bind(from)
37 .bind(to)
38 .bind(id)
39 .bind(value)
40 .execute(pool)
41 .await?;
42
43 Ok(())
44 }
45}
46
47impl Emit for ERC1155::TransferSingle {
48 async fn emit(
49 &self,
50 queue: &redis::Client,
51 network: &crate::networks::NetworkKind,
52 ) -> Result<(), PropagateError> {
53 let mut con = queue.get_async_connection().await?;
54
55 let channel = format!(
56 "{}:{}",
57 network,
58 ResourceKind::Log(LogKind::ERC1155_TransferSingle)
59 );
60 con.lpush(channel, serde_json::to_string(self)?).await?;
61
62 Ok(())
63 }
64}
65
66impl Stream for ERC1155::TransferSingle {
67 async fn stream(
68 &self,
69 queue: &redis::Client,
70 network: &crate::networks::NetworkKind,
71 ) -> Result<(), PropagateError> {
72 let mut con = queue.get_async_connection().await?;
73
74 let key = format!(
75 "{}:{}",
76 network,
77 ResourceKind::Log(LogKind::ERC1155_TransferSingle)
78 );
79 let serialized_data = serde_json::to_string(self)?;
80
81 let res: redis::RedisResult<String> = redis::cmd("XADD")
82 .arg(&key)
83 .arg("MAXLEN")
84 .arg("~")
85 .arg("100000")
86 .arg("*")
87 .arg("payload")
88 .arg(serialized_data)
89 .query_async(&mut con)
90 .await;
91
92 match res {
93 Ok(_) => Ok(()),
94 Err(e) => Err(PropagateError::StreamError(e.to_string())),
95 }
96 }
97}
98
99impl Insert for ERC1155::TransferBatch {
100 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
101 let tx = tx_hash.as_ref().map(|v| v.as_slice());
102 let operator = self.operator.as_slice();
103 let from = self.from.as_slice();
104 let to = self.to.as_slice();
105 let ids = self.ids.iter().map(|v| v.as_le_slice()).collect::<Vec<_>>();
106 let values = self
107 .values
108 .iter()
109 .map(|v| v.as_le_slice())
110 .collect::<Vec<_>>();
111
112 let sql = r#"INSERT INTO erc1155_transfer_batch (
113 tx_hash,
114 operator,
115 "from",
116 "to",
117 ids,
118 values )
119 VALUES (
120 $1, $2, $3, $4, $5, $6
121 ) ON CONFLICT DO NOTHING"#;
122
123 sqlx::query(sql)
124 .bind(tx)
125 .bind(operator)
126 .bind(from)
127 .bind(to)
128 .bind(ids)
129 .bind(values)
130 .execute(pool)
131 .await?;
132
133 Ok(())
134 }
135}
136
137impl Emit for ERC1155::TransferBatch {
138 async fn emit(
139 &self,
140 queue: &redis::Client,
141 network: &crate::networks::NetworkKind,
142 ) -> Result<(), PropagateError> {
143 let mut con = queue.get_async_connection().await?;
144
145 let channel = format!(
146 "{}:{}",
147 network,
148 ResourceKind::Log(LogKind::ERC1155_TransferBatch)
149 );
150 con.lpush(channel, serde_json::to_string(self)?).await?;
151
152 Ok(())
153 }
154}
155
156impl Stream for ERC1155::TransferBatch {
157 async fn stream(
158 &self,
159 queue: &redis::Client,
160 network: &crate::networks::NetworkKind,
161 ) -> Result<(), PropagateError> {
162 let mut con = queue.get_async_connection().await?;
163
164 let key = format!(
165 "{}:{}",
166 network,
167 ResourceKind::Log(LogKind::ERC1155_TransferBatch)
168 );
169 let serialized_data = serde_json::to_string(self)?;
170
171 let res: redis::RedisResult<String> = redis::cmd("XADD")
172 .arg(&key)
173 .arg("MAXLEN")
174 .arg("~")
175 .arg("100000")
176 .arg("*")
177 .arg("payload")
178 .arg(serialized_data)
179 .query_async(&mut con)
180 .await;
181
182 match res {
183 Ok(_) => Ok(()),
184 Err(e) => Err(PropagateError::StreamError(e.to_string())),
185 }
186 }
187}
188
189impl Insert for ERC1155::URI {
190 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
191 let tx = tx_hash.as_ref().map(|v| v.as_slice());
192 let value = self.value.as_str();
193 let id = self.id.as_le_slice();
194
195 let sql = r#"INSERT INTO erc1155_uri (
196 tx_hash,
197 "value",
198 id )
199 VALUES (
200 $1, $2, $3
201 ) ON CONFLICT DO NOTHING"#;
202
203 sqlx::query(sql)
204 .bind(tx)
205 .bind(value)
206 .bind(id)
207 .execute(pool)
208 .await?;
209
210 Ok(())
211 }
212}
213
214impl Emit for ERC1155::URI {
215 async fn emit(
216 &self,
217 queue: &redis::Client,
218 network: &crate::networks::NetworkKind,
219 ) -> Result<(), PropagateError> {
220 let mut con = queue.get_async_connection().await?;
221
222 let channel = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC1155_URI));
223 con.lpush(channel, serde_json::to_string(self)?).await?;
224
225 Ok(())
226 }
227}
228
229impl Stream for ERC1155::URI {
230 async fn stream(
231 &self,
232 queue: &redis::Client,
233 network: &crate::networks::NetworkKind,
234 ) -> Result<(), PropagateError> {
235 let mut con = queue.get_async_connection().await?;
236
237 let key = format!("{}:{}", network, ResourceKind::Log(LogKind::ERC1155_URI));
238 let serialized_data = serde_json::to_string(self)?;
239
240 let res: redis::RedisResult<String> = redis::cmd("XADD")
241 .arg(&key)
242 .arg("MAXLEN")
243 .arg("~")
244 .arg("100000")
245 .arg("*")
246 .arg("payload")
247 .arg(serialized_data)
248 .query_async(&mut con)
249 .await;
250
251 match res {
252 Ok(_) => Ok(()),
253 Err(e) => Err(PropagateError::StreamError(e.to_string())),
254 }
255 }
256}