aptos_network_sdk/
event.rs

1use crate::{Aptos, types::Event};
2use serde_json::Value;
3use std::{collections::HashMap, sync::Arc, time::Duration};
4use tokio::sync::broadcast;
5
6/// event handler
7pub struct EventHandler;
8
9#[derive(Debug, Clone)]
10pub struct EventData {
11    pub event_type: String,
12    pub event_data: Value,
13    pub sequence_number: u64,
14    pub transaction_hash: String,
15    pub block_height: u64,
16}
17
18impl EventHandler {
19    /// Real-time monitoring of event streams
20    pub async fn start_event_stream(
21        client: Arc<Aptos>,
22        address: String,
23        event_handle: String,
24        event_sender: broadcast::Sender<EventData>,
25    ) -> Result<(), String> {
26        let mut last_sequence: Option<u64> = None;
27        loop {
28            match client
29                .get_account_event_vec(&address, &event_handle, Some(100), last_sequence)
30                .await
31            {
32                Ok(events) => {
33                    for event in events {
34                        let sequence_number = match event.sequence_number.parse::<u64>() {
35                            Ok(seq) => seq,
36                            Err(_) => continue,
37                        };
38                        // 只处理新事件
39                        if last_sequence
40                            .map(|last| sequence_number > last)
41                            .unwrap_or(true)
42                        {
43                            let event_data = EventData {
44                                event_type: event.r#type.clone(),
45                                event_data: event.data.clone(),
46                                sequence_number,
47                                transaction_hash: "hash".to_string(),
48                                block_height: client.get_chain_height().await.unwrap() as u64,
49                            };
50                            let _ = event_sender.send(event_data);
51                            last_sequence = Some(sequence_number);
52                        }
53                    }
54                }
55                Err(e) => {
56                    eprintln!("Error fetching events: {}", e);
57                }
58            }
59            tokio::time::sleep(Duration::from_secs(2)).await;
60        }
61    }
62
63    /// Event stream containing transaction information
64    pub async fn start_event_stream_with_tx_info(
65        client: Arc<Aptos>,
66        address: String,
67        event_handle: String,
68        event_sender: broadcast::Sender<EventData>,
69    ) -> Result<(), String> {
70        let mut last_sequence: Option<u64> = None;
71        loop {
72            match client
73                .get_account_event_vec(&address, &event_handle, Some(100), last_sequence)
74                .await
75            {
76                Ok(events) => {
77                    for event in events {
78                        let sequence_number = match event.sequence_number.parse::<u64>() {
79                            Ok(seq) => seq,
80                            Err(_) => continue,
81                        };
82                        if last_sequence
83                            .map(|last| sequence_number > last)
84                            .unwrap_or(true)
85                        {
86                            // Get transaction information
87                            let transaction_hash = "hash".to_string();
88                            let block_height = client.get_chain_height().await.unwrap() as u64;
89                            let event_data = EventData {
90                                event_type: event.r#type.clone(),
91                                event_data: event.data.clone(),
92                                sequence_number,
93                                transaction_hash,
94                                block_height,
95                            };
96                            let _ = event_sender.send(event_data);
97                            last_sequence = Some(sequence_number);
98                        }
99                    }
100                }
101                Err(e) => {
102                    eprintln!("Error fetching events: {}", e);
103                }
104            }
105
106            tokio::time::sleep(Duration::from_secs(2)).await;
107        }
108    }
109
110    /// event filter
111    pub fn filter_events(
112        events: Vec<EventData>,
113        filters: HashMap<String, Value>,
114    ) -> Vec<EventData> {
115        events
116            .into_iter()
117            .filter(|event| {
118                filters
119                    .iter()
120                    .all(|(key, value)| event.event_data.get(key).map_or(false, |v| v == value))
121            })
122            .collect()
123    }
124
125    /// event aggregator
126    pub fn event_aggregator(
127        events: Vec<EventData>,
128        group_by: &str,
129    ) -> HashMap<String, Vec<EventData>> {
130        let mut grouped = HashMap::new();
131
132        for event in events {
133            if let Some(group_key) = event.event_data.get(group_by).and_then(|v| v.as_str()) {
134                grouped
135                    .entry(group_key.to_string())
136                    .or_insert_with(Vec::new)
137                    .push(event);
138            }
139        }
140        grouped
141    }
142}
143
144/// Event Subscription Manager
145pub struct EventSubscriptionManager {
146    subscriptions: HashMap<String, broadcast::Sender<EventData>>,
147}
148
149impl EventSubscriptionManager {
150    pub fn new() -> Self {
151        Self {
152            subscriptions: HashMap::new(),
153        }
154    }
155
156    /// Subscribe to specific events
157    pub fn subscribe(&mut self, event_key: String) -> broadcast::Receiver<EventData> {
158        let (sender, receiver) = broadcast::channel(100);
159        self.subscriptions.insert(event_key, sender);
160        receiver
161    }
162
163    /// Publish events to subscribers
164    pub fn publish_event(&self, event_key: &str, event: EventData) -> Result<(), String> {
165        if let Some(sender) = self.subscriptions.get(event_key) {
166            let _ = sender.send(event);
167            Ok(())
168        } else {
169            Err(format!("No subscribers for event key: {}", event_key))
170        }
171    }
172
173    /// Create EventData from the original Event and publish it
174    pub fn publish_from_raw_event(
175        &self,
176        event_key: &str,
177        event: Event,
178        transaction_hash: String,
179        block_height: u64,
180    ) -> Result<(), String> {
181        let sequence_number = match event.sequence_number.parse::<u64>() {
182            Ok(seq) => seq,
183            Err(_) => return Err("Invalid sequence number".to_string()),
184        };
185        let event_data = EventData {
186            event_type: event.r#type,
187            event_data: event.data,
188            sequence_number,
189            transaction_hash,
190            block_height,
191        };
192        self.publish_event(event_key, event_data)
193    }
194}
195
196/// event handling tools
197pub struct EventUtils;
198
199impl EventUtils {
200    /// Create EventData from the Event structure
201    pub fn create_event_data_from_event(
202        event: Event,
203        transaction_hash: String,
204        block_height: u64,
205    ) -> Result<EventData, String> {
206        let sequence_number = event
207            .sequence_number
208            .parse::<u64>()
209            .map_err(|_| "Invalid sequence number".to_string())?;
210        Ok(EventData {
211            event_type: event.r#type,
212            event_data: event.data,
213            sequence_number,
214            transaction_hash,
215            block_height,
216        })
217    }
218
219    /// Extract specific fields from events
220    pub fn extract_event_field(event: &EventData, field: &str) -> Option<Value> {
221        event.event_data.get(field).cloned()
222    }
223
224    /// Check if the event type matches
225    pub fn is_event_type(event: &EventData, expected_type: &str) -> bool {
226        event.event_type == expected_type
227    }
228
229    /// Batch processing of events
230    pub fn process_events_batch<F>(events: Vec<EventData>, processor: F)
231    where
232        F: Fn(EventData) -> Result<(), String>,
233    {
234        for event in events {
235            if let Err(e) = processor(event) {
236                eprintln!("Error processing event: {}", e);
237            }
238        }
239    }
240}