eventify_primitives/events/
erc4626.rs1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC4626;
7use crate::{
8 networks::{LogKind, ResourceKind},
9 traits::{Emit, Insert, Stream},
10 PropagateError,
11};
12
13impl Insert for ERC4626::Deposit {
14 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15 let tx = tx_hash.as_ref().map(|v| v.as_slice());
16 let sender = self.sender.as_slice();
17 let owner = self.owner.as_slice();
18 let assets = self.assets.as_le_slice();
19 let shares = self.shares.as_le_slice();
20
21 let sql = r#"INSERT INTO erc4626_deposit (
22 tx_hash,
23 sender,
24 "owner",
25 "assets",
26 shares )
27 VALUES (
28 $1, $2, $3, $4, $5
29 ) ON CONFLICT DO NOTHING"#;
30
31 sqlx::query(sql)
32 .bind(tx)
33 .bind(sender)
34 .bind(owner)
35 .bind(assets)
36 .bind(shares)
37 .execute(pool)
38 .await?;
39
40 Ok(())
41 }
42}
43
44impl Emit for ERC4626::Deposit {
45 async fn emit(
46 &self,
47 queue: &redis::Client,
48 network: &crate::networks::NetworkKind,
49 ) -> Result<(), PropagateError> {
50 let mut con = queue.get_async_connection().await?;
51
52 let channel = format!(
53 "{}:{}",
54 network,
55 ResourceKind::Log(LogKind::ERC4626_Deposit)
56 );
57 con.lpush(channel, serde_json::to_string(self)?).await?;
58
59 Ok(())
60 }
61}
62
63impl Stream for ERC4626::Deposit {
64 async fn stream(
65 &self,
66 queue: &redis::Client,
67 network: &crate::networks::NetworkKind,
68 ) -> Result<(), PropagateError> {
69 let mut con = queue.get_async_connection().await?;
70
71 let key = format!(
72 "{}:{}",
73 network,
74 ResourceKind::Log(LogKind::ERC4626_Deposit)
75 );
76 let serialized_data = serde_json::to_string(self)?;
77
78 let res: redis::RedisResult<String> = redis::cmd("XADD")
79 .arg(&key)
80 .arg("MAXLEN")
81 .arg("~")
82 .arg("100000")
83 .arg("*")
84 .arg("payload")
85 .arg(serialized_data)
86 .query_async(&mut con)
87 .await;
88
89 match res {
90 Ok(_) => Ok(()),
91 Err(e) => Err(PropagateError::StreamError(e.to_string())),
92 }
93 }
94}
95
96impl Insert for ERC4626::Withdraw {
97 async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
98 let tx = tx_hash.as_ref().map(|v| v.as_slice());
99 let sender = self.sender.as_slice();
100 let receiver = self.receiver.as_slice();
101 let owner = self.owner.as_slice();
102 let assets = self.assets.as_le_slice();
103 let shares = self.shares.as_le_slice();
104
105 let sql = r#"INSERT INTO erc4626_withdraw (
106 tx_hash,
107 sender,
108 "receiver",
109 "owner",
110 "assets",
111 shares )
112 VALUES (
113 $1, $2, $3, $4, $5, $6
114 ) ON CONFLICT DO NOTHING"#;
115
116 sqlx::query(sql)
117 .bind(tx)
118 .bind(sender)
119 .bind(receiver)
120 .bind(owner)
121 .bind(assets)
122 .bind(shares)
123 .execute(pool)
124 .await?;
125
126 Ok(())
127 }
128}
129
130impl Emit for ERC4626::Withdraw {
131 async fn emit(
132 &self,
133 queue: &redis::Client,
134 network: &crate::networks::NetworkKind,
135 ) -> Result<(), PropagateError> {
136 let mut con = queue.get_async_connection().await?;
137
138 let channel = format!(
139 "{}:{}",
140 network,
141 ResourceKind::Log(LogKind::ERC4626_Withdraw)
142 );
143 con.lpush(channel, serde_json::to_string(self)?).await?;
144
145 Ok(())
146 }
147}
148
149impl Stream for ERC4626::Withdraw {
150 async fn stream(
151 &self,
152 queue: &redis::Client,
153 network: &crate::networks::NetworkKind,
154 ) -> Result<(), PropagateError> {
155 let mut con = queue.get_async_connection().await?;
156
157 let key = format!(
158 "{}:{}",
159 network,
160 ResourceKind::Log(LogKind::ERC4626_Withdraw)
161 );
162 let serialized_data = serde_json::to_string(self)?;
163
164 let res: redis::RedisResult<String> = redis::cmd("XADD")
165 .arg(&key)
166 .arg("MAXLEN")
167 .arg("~")
168 .arg("100000")
169 .arg("*")
170 .arg("payload")
171 .arg(serialized_data)
172 .query_async(&mut con)
173 .await;
174
175 match res {
176 Ok(_) => Ok(()),
177 Err(e) => Err(PropagateError::StreamError(e.to_string())),
178 }
179 }
180}