eventify_primitives/events/
erc4626.rs

1use alloy_primitives::B256;
2use eyre::Result;
3use redis::AsyncCommands;
4use sqlx::{Error as SqlError, PgPool};
5
6use super::ERC4626;
7use crate::{
8    networks::{LogKind, ResourceKind},
9    traits::{Emit, Insert, Stream},
10    PropagateError,
11};
12
13impl Insert for ERC4626::Deposit {
14    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
15        let tx = tx_hash.as_ref().map(|v| v.as_slice());
16        let sender = self.sender.as_slice();
17        let owner = self.owner.as_slice();
18        let assets = self.assets.as_le_slice();
19        let shares = self.shares.as_le_slice();
20
21        let sql = r#"INSERT INTO erc4626_deposit (
22            tx_hash,
23            sender,
24            "owner",
25            "assets",
26            shares )
27            VALUES (
28                $1, $2, $3, $4, $5
29            ) ON CONFLICT DO NOTHING"#;
30
31        sqlx::query(sql)
32            .bind(tx)
33            .bind(sender)
34            .bind(owner)
35            .bind(assets)
36            .bind(shares)
37            .execute(pool)
38            .await?;
39
40        Ok(())
41    }
42}
43
44impl Emit for ERC4626::Deposit {
45    async fn emit(
46        &self,
47        queue: &redis::Client,
48        network: &crate::networks::NetworkKind,
49    ) -> Result<(), PropagateError> {
50        let mut con = queue.get_async_connection().await?;
51
52        let channel = format!(
53            "{}:{}",
54            network,
55            ResourceKind::Log(LogKind::ERC4626_Deposit)
56        );
57        con.lpush(channel, serde_json::to_string(self)?).await?;
58
59        Ok(())
60    }
61}
62
63impl Stream for ERC4626::Deposit {
64    async fn stream(
65        &self,
66        queue: &redis::Client,
67        network: &crate::networks::NetworkKind,
68    ) -> Result<(), PropagateError> {
69        let mut con = queue.get_async_connection().await?;
70
71        let key = format!(
72            "{}:{}",
73            network,
74            ResourceKind::Log(LogKind::ERC4626_Deposit)
75        );
76        let serialized_data = serde_json::to_string(self)?;
77
78        let res: redis::RedisResult<String> = redis::cmd("XADD")
79            .arg(&key)
80            .arg("MAXLEN")
81            .arg("~")
82            .arg("100000")
83            .arg("*")
84            .arg("payload")
85            .arg(serialized_data)
86            .query_async(&mut con)
87            .await;
88
89        match res {
90            Ok(_) => Ok(()),
91            Err(e) => Err(PropagateError::StreamError(e.to_string())),
92        }
93    }
94}
95
96impl Insert for ERC4626::Withdraw {
97    async fn insert(&self, pool: &PgPool, tx_hash: &Option<B256>) -> Result<(), SqlError> {
98        let tx = tx_hash.as_ref().map(|v| v.as_slice());
99        let sender = self.sender.as_slice();
100        let receiver = self.receiver.as_slice();
101        let owner = self.owner.as_slice();
102        let assets = self.assets.as_le_slice();
103        let shares = self.shares.as_le_slice();
104
105        let sql = r#"INSERT INTO erc4626_withdraw (
106            tx_hash,
107            sender,
108            "receiver",
109            "owner",
110            "assets",
111            shares )
112            VALUES (
113                $1, $2, $3, $4, $5, $6
114            ) ON CONFLICT DO NOTHING"#;
115
116        sqlx::query(sql)
117            .bind(tx)
118            .bind(sender)
119            .bind(receiver)
120            .bind(owner)
121            .bind(assets)
122            .bind(shares)
123            .execute(pool)
124            .await?;
125
126        Ok(())
127    }
128}
129
130impl Emit for ERC4626::Withdraw {
131    async fn emit(
132        &self,
133        queue: &redis::Client,
134        network: &crate::networks::NetworkKind,
135    ) -> Result<(), PropagateError> {
136        let mut con = queue.get_async_connection().await?;
137
138        let channel = format!(
139            "{}:{}",
140            network,
141            ResourceKind::Log(LogKind::ERC4626_Withdraw)
142        );
143        con.lpush(channel, serde_json::to_string(self)?).await?;
144
145        Ok(())
146    }
147}
148
149impl Stream for ERC4626::Withdraw {
150    async fn stream(
151        &self,
152        queue: &redis::Client,
153        network: &crate::networks::NetworkKind,
154    ) -> Result<(), PropagateError> {
155        let mut con = queue.get_async_connection().await?;
156
157        let key = format!(
158            "{}:{}",
159            network,
160            ResourceKind::Log(LogKind::ERC4626_Withdraw)
161        );
162        let serialized_data = serde_json::to_string(self)?;
163
164        let res: redis::RedisResult<String> = redis::cmd("XADD")
165            .arg(&key)
166            .arg("MAXLEN")
167            .arg("~")
168            .arg("100000")
169            .arg("*")
170            .arg("payload")
171            .arg(serialized_data)
172            .query_async(&mut con)
173            .await;
174
175        match res {
176            Ok(_) => Ok(()),
177            Err(e) => Err(PropagateError::StreamError(e.to_string())),
178        }
179    }
180}