1use crate::{Aptos, types::Event};
2use serde_json::Value;
3use std::{collections::HashMap, sync::Arc, time::Duration};
4use tokio::sync::broadcast;
5
6pub struct EventHandler;
8
9#[derive(Debug, Clone)]
10pub struct EventData {
11 pub event_type: String,
12 pub event_data: Value,
13 pub sequence_number: u64,
14 pub transaction_hash: String,
15 pub block_height: u64,
16}
17
18impl EventHandler {
19 pub async fn start_event_stream(
21 client: Arc<Aptos>,
22 address: String,
23 event_handle: String,
24 event_sender: broadcast::Sender<EventData>,
25 ) -> Result<(), String> {
26 let mut last_sequence: Option<u64> = None;
27 loop {
28 match client
29 .get_account_event_vec(&address, &event_handle, Some(100), last_sequence)
30 .await
31 {
32 Ok(events) => {
33 for event in events {
34 let sequence_number = match event.sequence_number.parse::<u64>() {
35 Ok(seq) => seq,
36 Err(_) => continue,
37 };
38 if last_sequence
40 .map(|last| sequence_number > last)
41 .unwrap_or(true)
42 {
43 let event_data = EventData {
44 event_type: event.r#type.clone(),
45 event_data: event.data.clone(),
46 sequence_number,
47 transaction_hash: "hash".to_string(),
48 block_height: client.get_chain_height().await.unwrap() as u64,
49 };
50 let _ = event_sender.send(event_data);
51 last_sequence = Some(sequence_number);
52 }
53 }
54 }
55 Err(e) => {
56 eprintln!("Error fetching events: {}", e);
57 }
58 }
59 tokio::time::sleep(Duration::from_secs(2)).await;
60 }
61 }
62
63 pub async fn start_event_stream_with_tx_info(
65 client: Arc<Aptos>,
66 address: String,
67 event_handle: String,
68 event_sender: broadcast::Sender<EventData>,
69 ) -> Result<(), String> {
70 let mut last_sequence: Option<u64> = None;
71 loop {
72 match client
73 .get_account_event_vec(&address, &event_handle, Some(100), last_sequence)
74 .await
75 {
76 Ok(events) => {
77 for event in events {
78 let sequence_number = match event.sequence_number.parse::<u64>() {
79 Ok(seq) => seq,
80 Err(_) => continue,
81 };
82 if last_sequence
83 .map(|last| sequence_number > last)
84 .unwrap_or(true)
85 {
86 let transaction_hash = "hash".to_string();
88 let block_height = client.get_chain_height().await.unwrap() as u64;
89 let event_data = EventData {
90 event_type: event.r#type.clone(),
91 event_data: event.data.clone(),
92 sequence_number,
93 transaction_hash,
94 block_height,
95 };
96 let _ = event_sender.send(event_data);
97 last_sequence = Some(sequence_number);
98 }
99 }
100 }
101 Err(e) => {
102 eprintln!("Error fetching events: {}", e);
103 }
104 }
105
106 tokio::time::sleep(Duration::from_secs(2)).await;
107 }
108 }
109
110 pub fn filter_events(
112 events: Vec<EventData>,
113 filters: HashMap<String, Value>,
114 ) -> Vec<EventData> {
115 events
116 .into_iter()
117 .filter(|event| {
118 filters
119 .iter()
120 .all(|(key, value)| event.event_data.get(key).map_or(false, |v| v == value))
121 })
122 .collect()
123 }
124
125 pub fn event_aggregator(
127 events: Vec<EventData>,
128 group_by: &str,
129 ) -> HashMap<String, Vec<EventData>> {
130 let mut grouped = HashMap::new();
131
132 for event in events {
133 if let Some(group_key) = event.event_data.get(group_by).and_then(|v| v.as_str()) {
134 grouped
135 .entry(group_key.to_string())
136 .or_insert_with(Vec::new)
137 .push(event);
138 }
139 }
140 grouped
141 }
142}
143
144pub struct EventSubscriptionManager {
146 subscriptions: HashMap<String, broadcast::Sender<EventData>>,
147}
148
149impl EventSubscriptionManager {
150 pub fn new() -> Self {
151 Self {
152 subscriptions: HashMap::new(),
153 }
154 }
155
156 pub fn subscribe(&mut self, event_key: String) -> broadcast::Receiver<EventData> {
158 let (sender, receiver) = broadcast::channel(100);
159 self.subscriptions.insert(event_key, sender);
160 receiver
161 }
162
163 pub fn publish_event(&self, event_key: &str, event: EventData) -> Result<(), String> {
165 if let Some(sender) = self.subscriptions.get(event_key) {
166 let _ = sender.send(event);
167 Ok(())
168 } else {
169 Err(format!("No subscribers for event key: {}", event_key))
170 }
171 }
172
173 pub fn publish_from_raw_event(
175 &self,
176 event_key: &str,
177 event: Event,
178 transaction_hash: String,
179 block_height: u64,
180 ) -> Result<(), String> {
181 let sequence_number = match event.sequence_number.parse::<u64>() {
182 Ok(seq) => seq,
183 Err(_) => return Err("Invalid sequence number".to_string()),
184 };
185 let event_data = EventData {
186 event_type: event.r#type,
187 event_data: event.data,
188 sequence_number,
189 transaction_hash,
190 block_height,
191 };
192 self.publish_event(event_key, event_data)
193 }
194}
195
196pub struct EventUtils;
198
199impl EventUtils {
200 pub fn create_event_data_from_event(
202 event: Event,
203 transaction_hash: String,
204 block_height: u64,
205 ) -> Result<EventData, String> {
206 let sequence_number = event
207 .sequence_number
208 .parse::<u64>()
209 .map_err(|_| "Invalid sequence number".to_string())?;
210 Ok(EventData {
211 event_type: event.r#type,
212 event_data: event.data,
213 sequence_number,
214 transaction_hash,
215 block_height,
216 })
217 }
218
219 pub fn extract_event_field(event: &EventData, field: &str) -> Option<Value> {
221 event.event_data.get(field).cloned()
222 }
223
224 pub fn is_event_type(event: &EventData, expected_type: &str) -> bool {
226 event.event_type == expected_type
227 }
228
229 pub fn process_events_batch<F>(events: Vec<EventData>, processor: F)
231 where
232 F: Fn(EventData) -> Result<(), String>,
233 {
234 for event in events {
235 if let Err(e) = processor(event) {
236 eprintln!("Error processing event: {}", e);
237 }
238 }
239 }
240}