arbiter_core/events.rs
1//! The `data_collection` module provides the `EventLogger` struct for logging
2//! events from the Ethereum network.
3//!
4//! The `EventLogger` struct contains a BTreeMap of events, where each event is
5//! represented by a string key and a vector of `Event` instances.
6//! It also optionally contains a path where the event logs will be stored.
7//!
8//! This module also provides the implementation of the `EventLogger` struct,
9//! including methods for constructing a new `EventLogger`, adding an event to
10//! the `EventLogger`, and writing the event logs to a file.
11//!
12//! # Type Parameters
13//!
14//! * `M` - Middleware that implements the `Middleware` trait,
15//! `std::borrow::Borrow<D>`, and has a static lifetime.
16//! * `D` - Middleware that implements the `Middleware` trait, `Debug`, `Send`,
17//! `Sync`, and has a static lifetime.
18//! * `E` - Type that implements the `EthLogDecode`, `Debug`, `Serialize`
19//! traits, and has a static lifetime.
20
21use std::{io::BufWriter, marker::PhantomData, mem::transmute, pin::Pin};
22
23use ethers::{
24 abi::RawLog,
25 contract::{builders::Event, EthLogDecode},
26 core::k256::sha2::{Digest, Sha256},
27 providers::Middleware,
28 types::{Filter, FilteredParams},
29};
30use futures_util::Stream;
31use polars::{
32 io::parquet::ParquetWriter,
33 prelude::{CsvWriter, DataFrame, NamedFrom, SerWriter},
34 series::Series,
35};
36use serde::Serialize;
37use serde_json::Value;
38use tokio::{sync::broadcast::Receiver as BroadcastReceiver, task::JoinHandle};
39
40use super::*;
41use crate::middleware::{connection::revm_logs_to_ethers_logs, ArbiterMiddleware};
42
43pub(crate) type FilterDecoder =
44 BTreeMap<String, (FilteredParams, Box<dyn Fn(&RawLog) -> String + Send + Sync>)>;
45/// `EventLogger` is a struct that logs events from the Ethereum network.
46///
47/// It contains a BTreeMap of events, where each event is represented by a
48/// string key and a vector of `Event` instances. It also optionally contains a
49/// path where the event logs will be stored.
50///
51/// # Type Parameters
52///
53/// * `M` - Middleware that implements the `Middleware` trait,
54/// `std::borrow::Borrow<D>`, and has a static lifetime.
55/// * `D` - Middleware that implements the `Middleware` trait, `Debug`, `Send`,
56/// `Sync`, and has a static lifetime.
57/// * `E` - Type that implements the `EthLogDecode`, `Debug`, `Serialize`
58/// traits, and has a static lifetime.
59pub struct Logger {
60 decoder: FilterDecoder,
61 receiver: Option<BroadcastReceiver<Broadcast>>,
62 output_file_type: Option<OutputFileType>,
63 directory: Option<String>,
64 file_name: Option<String>,
65 metadata: Option<Value>,
66}
67
68impl Debug for Logger {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("EventLogger")
71 .field("receiver", &self.receiver)
72 .field("output_file_type", &self.output_file_type)
73 .field("directory", &self.directory)
74 .field("file_name", &self.file_name)
75 .field("metadata", &self.metadata)
76 .finish()
77 }
78}
79
80/// `OutputFileType` is an enumeration that represents the different types of
81/// file formats that the `EventLogger` can output to.
82#[derive(Debug, Clone, Copy, Serialize)]
83pub enum OutputFileType {
84 /// * `JSON` - Represents the JSON file format. When this variant is used,
85 /// the `EventLogger` will output the logged events to a JSON file.
86 JSON,
87 /// * `CSV` - Represents the CSV (Comma Separated Values) file format. When
88 /// this variant is used, the `EventLogger` will output the logged events
89 /// to a CSV file.
90 CSV,
91 /// * `Parquet` - Represents the Parquet file format. When this variant is
92 /// used, the `EventLogger` will output the logged events to a Parquet
93 /// file. Parquet is a columnar storage file format that is optimized for
94 /// use with big data processing frameworks.
95 Parquet,
96}
97
98impl Logger {
99 /// Constructs a new `EventLogger`.
100 ///
101 /// # Returns
102 ///
103 /// A fresh `EventLogger` instance with an uninitialized events BTreeMap and
104 /// no specified path.
105 pub fn builder() -> Self {
106 debug!("`EventLogger` initialized");
107 Self {
108 directory: None,
109 file_name: None,
110 decoder: BTreeMap::new(),
111 receiver: None,
112 // shutdown_sender: None,
113 output_file_type: None,
114 metadata: None,
115 }
116 }
117
118 /// Adds an event to the `EventLogger`.
119 ///
120 /// # Arguments
121 ///
122 /// * `event` - The event to be added.
123 /// * `name` - The name of the event.
124 ///
125 /// # Returns
126 ///
127 /// The `EventLogger` instance with the added event.
128 pub fn with_event<S: Into<String>, D: EthLogDecode + Debug + Serialize + 'static>(
129 mut self,
130 event: Event<Arc<ArbiterMiddleware>, ArbiterMiddleware, D>,
131 name: S,
132 ) -> Self {
133 let name = name.into();
134 // Grab the connection from the client and add a new event sender so that we
135 // have a distinct channel to now receive events over
136 let event_transmuted: EventTransmuted<Arc<ArbiterMiddleware>, ArbiterMiddleware, D> =
137 unsafe { transmute(event) };
138 let middleware = event_transmuted.provider.clone();
139 let decoder = |x: &_| serde_json::to_string(&D::decode_log(x).unwrap()).unwrap();
140 let filter = event_transmuted.filter.clone();
141 self.decoder.insert(
142 name.clone(),
143 (FilteredParams::new(Some(filter)), Box::new(decoder)),
144 );
145 let connection = middleware.provider().as_ref();
146 if self.receiver.is_none() {
147 self.receiver = Some(connection.event_sender.subscribe());
148 }
149 debug!("`EventLogger` now provided with event labeled: {:?}", name);
150 self
151 }
152
153 /// Sets the directory for the `EventLogger`.
154 ///
155 /// # Arguments
156 ///
157 /// * `directory` - The directory where the event logs will be stored.
158 ///
159 /// # Returns
160 ///
161 /// The `EventLogger` instance with the specified directory.
162 pub fn directory<S: Into<String>>(mut self, path: S) -> Self {
163 let cwd = std::env::current_dir().unwrap();
164 let full_path = cwd.join(path.into());
165 self.directory = Some(full_path.to_str().unwrap().to_owned());
166 debug!("`EventLogger` output directory set to: {:?}", full_path);
167 self
168 }
169
170 /// Sets the output file name for the `EventLogger`.
171 ///
172 /// # Arguments
173 ///
174 /// * `file_name` - The file where the event logs will be stored.
175 ///
176 /// # Returns
177 ///
178 /// The `EventLogger` instance with the specified file.
179 pub fn file_name<S: Into<String>>(mut self, path: S) -> Self {
180 let path = path.into();
181 self.file_name = Some(path.clone());
182 debug!("`EventLogger` output file name set to: {:?}", path);
183 self
184 }
185
186 /// Sets the output file type for the `EventLogger`.
187 /// The default file type is JSON.
188 /// # Arguments
189 ///
190 /// * `file_type` - The file type that the event logs will be stored in.
191 ///
192 /// # Returns
193 ///
194 /// The `EventLogger` instance with the specified file type.
195 pub fn file_type(mut self, file_type: OutputFileType) -> Self {
196 self.output_file_type = Some(file_type);
197 self
198 }
199 /// Sets the metadata for the `EventLogger`.
200 ///
201 /// # Arguments
202 ///
203 /// * `metadata` - The metadata to be stored with the event logs which must
204 /// implement the `Serialize` trait.
205 ///
206 /// # Returns
207 ///
208 /// The `EventLogger` instance with the specified metadata.
209 pub fn metadata(mut self, metadata: impl Serialize) -> Result<Self, serde_json::Error> {
210 let metadata = serde_json::to_value(metadata)?;
211 self.metadata = Some(metadata);
212 debug!("`EventLogger` metadata provided");
213 Ok(self)
214 }
215
216 /// Executes the `EventLogger`.
217 ///
218 /// This function starts the event logging process. It first deletes the
219 /// existing events directory, then creates a new directory for each
220 /// event. For each event, it creates a new CSV file and writes
221 /// the event data into the file. If the file already exists, it appends the
222 /// new data to the file.
223 ///
224 /// # Returns
225 ///
226 /// A `Result` which is:
227 ///
228 /// * `Ok(())` if the `EventLogger` ran successfully.
229 /// * `Err(RevmMiddlewareError)` if there was an error running the
230 /// `EventLogger`.
231 ///
232 /// # Errors
233 ///
234 /// This function will return an error if there is a problem creating the
235 /// directories or files, or writing to the files.
236 pub fn run(self) -> Result<JoinHandle<()>, ArbiterCoreError> {
237 let mut receiver = self.receiver.unwrap();
238 let dir = self.directory.unwrap_or("./data".into());
239 let file_name = self.file_name.unwrap_or("output".into());
240 let file_type = self.output_file_type.unwrap_or(OutputFileType::JSON);
241 let metadata = self.metadata.clone();
242 let task = tokio::spawn(async move {
243 let mut events: BTreeMap<String, BTreeMap<String, Vec<Value>>> = BTreeMap::new();
244 while let Ok(broadcast) = receiver.recv().await {
245 match broadcast {
246 Broadcast::StopSignal => {
247 debug!("`EventLogger` has seen a stop signal");
248 // create new directory with path
249 let output_dir = std::env::current_dir().unwrap().join(dir);
250 std::fs::create_dir_all(&output_dir).unwrap();
251 let file_path = output_dir.join(format!("{}.json", file_name));
252 debug!(
253 "`EventLogger` dumping event data into: {:?}",
254 file_path.to_str().unwrap().to_owned()
255 );
256 // match the file output type and write to correct file using the right file
257 // type
258 match file_type {
259 OutputFileType::JSON => {
260 let file_path = output_dir.join(format!("{}.json", file_name));
261 let file = std::fs::File::create(file_path).unwrap();
262 let writer = BufWriter::new(file);
263
264 #[derive(Serialize, Clone)]
265 struct OutputData<T> {
266 events: BTreeMap<String, BTreeMap<String, Vec<Value>>>,
267 metadata: Option<T>,
268 }
269 let data = OutputData { events, metadata };
270 serde_json::to_writer(writer, &data).expect("Unable to write data");
271 }
272 OutputFileType::CSV => {
273 // Write the DataFrame to a CSV file
274 let mut df = flatten_to_data_frame(events);
275 let file_path = output_dir.join(format!("{}.csv", file_name));
276 let file = std::fs::File::create(file_path).unwrap_or_else(|_| {
277 panic!("Error creating csv file");
278 });
279 let mut writer = CsvWriter::new(file);
280 writer.finish(&mut df).unwrap_or_else(|_| {
281 panic!("Error writing to csv file");
282 });
283 }
284 OutputFileType::Parquet => {
285 // Write the DataFrame to a parquet file
286 let mut df = flatten_to_data_frame(events);
287 let file_path = output_dir.join(format!("{}.parquet", file_name));
288 let file = std::fs::File::create(file_path).unwrap_or_else(|_| {
289 panic!("Error creating parquet file");
290 });
291 let writer = ParquetWriter::new(file);
292 writer.finish(&mut df).unwrap_or_else(|_| {
293 panic!("Error writing to parquet file");
294 });
295 }
296 }
297 break;
298 }
299 Broadcast::Event(event, receipt_data) => {
300 trace!("`EventLogger` received an event");
301 let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
302 for log in ethers_logs {
303 for (contract_name, (filter, decoder)) in self.decoder.iter() {
304 if filter.filter_address(&log) && filter.filter_topics(&log) {
305 let cloned_logs = log.clone();
306 let event_as_value = serde_json::from_str::<Value>(&decoder(
307 &cloned_logs.into(),
308 ))
309 .unwrap();
310 let event_as_object = event_as_value.as_object().unwrap();
311
312 let contract = events.get(contract_name);
313 if contract.is_none() {
314 events.insert(contract_name.clone(), BTreeMap::new());
315 }
316 let contract = events.get_mut(contract_name).unwrap();
317
318 let event_name =
319 event_as_object.clone().keys().collect::<Vec<&String>>()[0]
320 .clone();
321
322 let event = contract.get_mut(&event_name);
323 if event.is_none() {
324 contract.insert(event_name.to_string(), vec![]);
325 }
326 let event = contract.get_mut(&event_name).unwrap();
327
328 for (_key, value) in event_as_object {
329 event.push(value.clone());
330 }
331 trace!(
332 "`EventLogger` successfully filtered and logged the event"
333 )
334 }
335 }
336 }
337 }
338 }
339 }
340 });
341 Ok(task)
342 }
343}
344
345fn flatten_to_data_frame(events: BTreeMap<String, BTreeMap<String, Vec<Value>>>) -> DataFrame {
346 // 1. Flatten the BTreeMap
347 let mut contract_names = Vec::new();
348 let mut event_names = Vec::new();
349 let mut event_values = Vec::new();
350
351 for (contract, events) in &events {
352 for (event, values) in events {
353 for value in values {
354 contract_names.push(contract.clone());
355 event_names.push(event.clone());
356 event_values.push(value.to_string());
357 }
358 }
359 }
360
361 // 2. Convert the vectors into a DataFrame
362 DataFrame::new(vec![
363 Series::new("contract_name", contract_names),
364 Series::new("event_name", event_names),
365 Series::new("event_value", event_values),
366 ])
367 .unwrap()
368}
369pub(crate) struct EventTransmuted<B, M, D> {
370 /// The event filter's state
371 pub filter: Filter,
372 pub(crate) provider: B,
373 /// Stores the event datatype
374 pub(crate) datatype: PhantomData<D>,
375 pub(crate) _m: PhantomData<M>,
376}
377
378/// Adds an event to the `EventLogger` and generates a unique ID for the
379/// event since we don't need to name events that are solely streamed and
380/// not stored.
381pub fn stream_event<D: EthLogDecode + Debug + Serialize + 'static>(
382 event: Event<Arc<ArbiterMiddleware>, ArbiterMiddleware, D>,
383) -> Pin<Box<dyn Stream<Item = D> + Send + Sync>> {
384 let mut hasher = Sha256::new();
385 hasher.update(serde_json::to_string(&event.filter).unwrap());
386 let hash = hasher.finalize();
387 let id = hex::encode(hash);
388 let mut logger = Logger::builder().with_event(event, id);
389
390 if let Some(mut receiver) = logger.receiver.take() {
391 let stream = async_stream::stream! {
392 while let Ok(broadcast) = receiver.recv().await {
393 match broadcast {
394 Broadcast::StopSignal => {
395 trace!("`EventLogger` has seen a stop signal");
396 break;
397 }
398 Broadcast::Event(event, receipt_data) => {
399 trace!("`EventLogger` received an event");
400 let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
401 for log in ðers_logs {
402 for (_id, (filter, _)) in logger.decoder.iter() {
403 if filter.filter_address(log) && filter.filter_topics(log) {
404 let raw_log = RawLog::from(log.clone());
405 yield D::decode_log(&raw_log).unwrap();
406 }
407 }
408 }
409 }
410 }
411 }
412 };
413 Box::pin(stream)
414 } else {
415 unreachable!()
416 }
417}