1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
//! This module contains functions for managing Ethereum events
use crate::{client::Web3, types::NewFilter};
use crate::{jsonrpc::error::Web3Error, types::Log};
use clarity::{
abi::{derive_signature, AbiToken, SerializedToken},
utils::bytes_to_hex_str,
};
use clarity::{Address, Uint256};
use std::time::{Duration, Instant};
use std::vec;
use tokio::time::sleep as delay_for;
/// Converts anything that implements Into<AbiToken> to a [u8; 32] for use in event topics
/// this then needs to be converted to a hex string with 0x prepended.
pub fn convert_to_event(value: impl Into<AbiToken>) -> [u8; 32] {
let token = value.into();
match token.serialize() {
SerializedToken::Dynamic(_) => panic!("dyanmic types not supported!"),
SerializedToken::Static(v) => v,
}
}
/// Converts anything that implements Into<AbiToken> to a hex string with 0x prepended
/// useful as a direct argument to the topics field of a events filter
pub fn convert_to_event_string(value: impl Into<AbiToken>) -> String {
let token = value.into();
match token.serialize() {
SerializedToken::Dynamic(_) => panic!("dyanmic types not supported!"),
SerializedToken::Static(v) => bytes_to_data(&v),
}
}
// Internal function to convert a [u8; 32] to a hex string with 0x appended
pub fn bytes_to_data(s: &[u8]) -> String {
let mut val = "0x".to_string();
val.push_str(&bytes_to_hex_str(s));
val
}
/// helper: transpose a Vec of filters into per-topic buckets
fn build_final_topics(events: &[Vec<&str>]) -> Vec<Option<Vec<Option<String>>>> {
let max_len = events.iter().map(|f| f.len()).max().unwrap_or(0);
let mut final_topics = Vec::with_capacity(max_len);
for idx in 0..max_len {
let mut column = Vec::with_capacity(events.len());
for filter in events {
let v = filter.get(idx).copied();
if let Some(v) = v {
if let Ok(sig) = derive_signature(v) {
column.push(Some(bytes_to_data(&sig)));
} else if v.is_empty() {
column.push(None);
} else {
column.push(Some(v.to_string()));
}
}
}
final_topics.push(Some(column));
}
final_topics
}
impl Web3 {
/// Waits for a single event but instead of creating a filter and checking
/// for changes this function waits for the provided wait time before
/// checking if the event has occurred. This function will wait for at
// least 'wait_time' before progressing, regardless of the outcome.
pub async fn wait_for_event_alt<F: Fn(Log) -> bool + 'static>(
&self,
wait_time: Duration,
contract_address: Vec<Address>,
event: &str,
topics: Vec<Vec<[u8; 32]>>,
local_filter: F,
) -> Result<Log, Web3Error> {
let sig = derive_signature(event)?;
let mut final_topics = vec![Some(vec![Some(bytes_to_data(&sig))])];
for topic in topics {
let mut parts = Vec::new();
for item in topic {
parts.push(Some(bytes_to_data(&item)))
}
final_topics.push(Some(parts));
}
let new_filter = NewFilter {
address: contract_address,
from_block: None,
to_block: None,
topics: Some(final_topics),
};
delay_for(wait_time).await;
let logs = match self.eth_get_logs(new_filter.clone()).await {
Ok(logs) => logs,
Err(e) => return Err(e),
};
for log in logs {
if local_filter(log.clone()) {
return Ok(log);
}
}
Err(Web3Error::EventNotFound(event.to_string()))
}
/// Sets up an event filter, waits for a single event to happen, then removes the filter. Includes a
/// local filter. If a captured event does not pass this filter, it is ignored. This differs from
/// wait_for_event_alt in that it will check for filter changes every second and potentially exit
/// earlier than the wait_for time provided by the user.
pub async fn wait_for_event<F: Fn(Log) -> bool + 'static>(
&self,
wait_for: Duration,
contract_address: Vec<Address>,
event: &str,
topics: Vec<Vec<[u8; 32]>>,
local_filter: F,
) -> Result<Log, Web3Error> {
let sig = derive_signature(event)?;
let mut final_topics = vec![Some(vec![Some(bytes_to_data(&sig))])];
for topic in topics {
let mut parts = Vec::new();
for item in topic {
parts.push(Some(bytes_to_data(&item)))
}
final_topics.push(Some(parts));
}
let new_filter = NewFilter {
address: contract_address,
from_block: None,
to_block: None,
topics: Some(final_topics),
};
let filter_id = match self.eth_new_filter(new_filter).await {
Ok(f) => f,
Err(e) => return Err(e),
};
let start = Instant::now();
let mut found_log = None;
while Instant::now() - start < wait_for {
delay_for(Duration::from_secs(1)).await;
let logs = match self.eth_get_filter_changes(filter_id).await {
Ok(changes) => changes,
Err(e) => return Err(e),
};
for log in logs {
if local_filter(log.clone()) {
found_log = Some(log);
break;
}
}
}
if let Err(e) = self.eth_uninstall_filter(filter_id).await {
return Err(Web3Error::CouldNotRemoveFilter(format!("{e}")));
}
match found_log {
Some(log) => Ok(log),
None => Err(Web3Error::EventNotFound(event.to_string())),
}
}
/// Checks for multiple events each with additonal topics, the first argument of each vec of strings should be an event signature
/// followed by topics, topics are positional, so if you want to skip a topic provide an empty string. If no ending block is provided
/// the latest will be used. This function will not wait for events to occur.
/// An empty vec on the topics filter will return all events from the target contract.
/// Formatting limitations for queries for multiple events matching on multiple topics:
/// Some more complex queries may not work as expected.
/// Lets say we have three events
/// - FirstEvent(indxed address, indexed uint256, indexed string, string)
/// - SecondEvent(indxed address, indexed uint256, string)
/// - ThirdEvent(indxed address, indexed uint256, indexed string, string)
///
/// For this filter:
/// - FirstEvent(0x123, 1, "A")
/// - ThirdEvent(0x123, 1, "A", "B")
///
/// No results will be returned for the First event because the third topic does not have a placeholder. Events for the
/// Third event will be returned.
///
/// For this filter:
/// - FirstEvent(0x123, 1, "A", "")
/// - ThirdEvent(0x123, 1, "A", "B")
///
/// Results from both events will be returned as you might expect.
///
/// For this filter:
/// - FirstEvent(0x123, 1, "A", "")
/// - SecondEvent(0x123, 1, "A", "")
/// - ThirdEvent(0x123, 1, "A", "B")
///
/// Results from the second event will not be returned, because SecondEvent has only 2 indexed topics. Results from the First and Third
/// events will be returned. Leaving out the "" for Second event will not work either.
///
/// tl;dr The maximum number of indexed topics you can match when searching multiple events is the minimum number of indexed topics on any event
/// in the search.
pub async fn check_for_multiple_events(
&self,
start_block: Uint256,
end_block: Option<Uint256>,
contract_address: Vec<Address>,
events: Vec<Vec<&str>>,
) -> Result<Vec<Log>, Web3Error> {
let from_block = Some(format!("{start_block:#x}"));
let to_block = match end_block {
Some(e) => Some(format!("{e:#x}")),
None => Some(format!("{:#x}", self.eth_block_number().await?)),
};
let final_topics = build_final_topics(&events);
let new_filter = NewFilter {
address: contract_address,
from_block,
to_block,
topics: Some(final_topics),
};
self.eth_get_logs(new_filter).await
}
/// Checks an event with additional topics, the first argumement should always be an event signature, with the following being
/// topics, topics are positional, so if you want to skip a topic provide an empty string. If no ending block is provided
/// the latest will be used. This function will not wait for events to occur. Note this is a simplified endpoint that does not
/// fully represent the eth_getLogs endpoint, use eth_get_logs for the full power of event requests.
/// An emtpy vec on the topics filter will return all events from the target contract.
pub async fn check_for_events(
&self,
start_block: Uint256,
end_block: Option<Uint256>,
contract_address: Vec<Address>,
events: Vec<&str>,
) -> Result<Vec<Log>, Web3Error> {
// Build a filter with specified topics
let from_block = Some(format!("{start_block:#x}"));
let to_block;
if let Some(end_block) = end_block {
to_block = Some(format!("{end_block:#x}"));
} else {
let latest_block = self.eth_block_number().await?;
to_block = Some(format!("{latest_block:#x}"));
}
let mut final_topics = Vec::new();
for event in events {
if let Ok(sig) = derive_signature(event) {
final_topics.push(Some(vec![Some(bytes_to_data(&sig))]));
} else if event.is_empty() {
final_topics.push(None);
} else {
final_topics.push(Some(vec![Some(event.to_string())]));
}
}
let new_filter = NewFilter {
address: contract_address,
from_block,
to_block,
topics: Some(final_topics),
};
self.eth_get_logs(new_filter).await
}
}
#[cfg(test)]
mod tests {
use super::build_final_topics;
#[test]
fn transpose_unequal_lengths() {
let events = vec![vec!["123", "", "456", "789"], vec!["ABC", "DEF"]];
let topics = build_final_topics(&events);
println!("topics: {topics:#?}");
assert_eq!(topics.len(), 4);
let col0 = topics[0].as_ref().unwrap();
assert_eq!(col0.len(), 2);
assert!(col0[0].is_some());
assert!(col0[1].is_some());
let col1 = topics[1].as_ref().unwrap();
assert_eq!(col1.len(), 2);
assert!(col1[0].is_none());
assert!(col1[1].is_some());
let col2 = topics[2].as_ref().unwrap();
assert_eq!(col2.len(), 1);
assert!(col2[0].is_some());
let col3 = topics[3].as_ref().unwrap();
assert_eq!(col2.len(), 1);
assert!(col3[0].is_some());
}
#[test]
fn transpose_unequal_lengths_2() {
let events = vec![vec!["A()", "", "456", "789"], vec!["B()", "DEF"]];
let topics = build_final_topics(&events);
println!("topics: {topics:#?}");
assert_eq!(topics.len(), 4);
let col0 = topics[0].as_ref().unwrap();
assert_eq!(col0.len(), 2);
assert!(
col0[0].as_ref().unwrap()
== "0xf446c1d0ceceeffa77e664bc78fba57853bda372d626a510062480375fa80e90"
);
assert!(col0[1].is_some());
let col1 = topics[1].as_ref().unwrap();
assert_eq!(col1.len(), 2);
assert!(col1[0].is_none());
assert!(col1[1].is_some());
let col2 = topics[2].as_ref().unwrap();
assert_eq!(col2.len(), 1);
assert!(col2[0].is_some());
let col3 = topics[3].as_ref().unwrap();
assert_eq!(col2.len(), 1);
assert!(col3[0].is_some());
}
}