arbiter_core/
events.rs

1//! The `data_collection` module provides the `EventLogger` struct for logging
2//! events from the Ethereum network.
3//!
4//! The `EventLogger` struct contains a BTreeMap of events, where each event is
5//! represented by a string key and a vector of `Event` instances.
6//! It also optionally contains a path where the event logs will be stored.
7//!
8//! This module also provides the implementation of the `EventLogger` struct,
9//! including methods for constructing a new `EventLogger`, adding an event to
10//! the `EventLogger`, and writing the event logs to a file.
11//!
12//! # Type Parameters
13//!
14//! * `M` - Middleware that implements the `Middleware` trait,
15//!   `std::borrow::Borrow<D>`, and has a static lifetime.
16//! * `D` - Middleware that implements the `Middleware` trait, `Debug`, `Send`,
17//!   `Sync`, and has a static lifetime.
18//! * `E` - Type that implements the `EthLogDecode`, `Debug`, `Serialize`
19//!   traits, and has a static lifetime.
20
21use std::{io::BufWriter, marker::PhantomData, mem::transmute, pin::Pin};
22
23use ethers::{
24    abi::RawLog,
25    contract::{builders::Event, EthLogDecode},
26    core::k256::sha2::{Digest, Sha256},
27    providers::Middleware,
28    types::{Filter, FilteredParams},
29};
30use futures_util::Stream;
31use polars::{
32    io::parquet::ParquetWriter,
33    prelude::{CsvWriter, DataFrame, NamedFrom, SerWriter},
34    series::Series,
35};
36use serde::Serialize;
37use serde_json::Value;
38use tokio::{sync::broadcast::Receiver as BroadcastReceiver, task::JoinHandle};
39
40use super::*;
41use crate::middleware::{connection::revm_logs_to_ethers_logs, ArbiterMiddleware};
42
43pub(crate) type FilterDecoder =
44    BTreeMap<String, (FilteredParams, Box<dyn Fn(&RawLog) -> String + Send + Sync>)>;
45/// `EventLogger` is a struct that logs events from the Ethereum network.
46///
47/// It contains a BTreeMap of events, where each event is represented by a
48/// string key and a vector of `Event` instances. It also optionally contains a
49/// path where the event logs will be stored.
50///
51/// # Type Parameters
52///
53/// * `M` - Middleware that implements the `Middleware` trait,
54///   `std::borrow::Borrow<D>`, and has a static lifetime.
55/// * `D` - Middleware that implements the `Middleware` trait, `Debug`, `Send`,
56///   `Sync`, and has a static lifetime.
57/// * `E` - Type that implements the `EthLogDecode`, `Debug`, `Serialize`
58///   traits, and has a static lifetime.
59pub struct Logger {
60    decoder: FilterDecoder,
61    receiver: Option<BroadcastReceiver<Broadcast>>,
62    output_file_type: Option<OutputFileType>,
63    directory: Option<String>,
64    file_name: Option<String>,
65    metadata: Option<Value>,
66}
67
68impl Debug for Logger {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("EventLogger")
71            .field("receiver", &self.receiver)
72            .field("output_file_type", &self.output_file_type)
73            .field("directory", &self.directory)
74            .field("file_name", &self.file_name)
75            .field("metadata", &self.metadata)
76            .finish()
77    }
78}
79
80/// `OutputFileType` is an enumeration that represents the different types of
81/// file formats that the `EventLogger` can output to.
82#[derive(Debug, Clone, Copy, Serialize)]
83pub enum OutputFileType {
84    /// * `JSON` - Represents the JSON file format. When this variant is used,
85    ///   the `EventLogger` will output the logged events to a JSON file.
86    JSON,
87    /// * `CSV` - Represents the CSV (Comma Separated Values) file format. When
88    ///   this variant is used, the `EventLogger` will output the logged events
89    ///   to a CSV file.
90    CSV,
91    /// * `Parquet` - Represents the Parquet file format. When this variant is
92    ///   used, the `EventLogger` will output the logged events to a Parquet
93    ///   file. Parquet is a columnar storage file format that is optimized for
94    ///   use with big data processing frameworks.
95    Parquet,
96}
97
98impl Logger {
99    /// Constructs a new `EventLogger`.
100    ///
101    /// # Returns
102    ///
103    /// A fresh `EventLogger` instance with an uninitialized events BTreeMap and
104    /// no specified path.
105    pub fn builder() -> Self {
106        debug!("`EventLogger` initialized");
107        Self {
108            directory: None,
109            file_name: None,
110            decoder: BTreeMap::new(),
111            receiver: None,
112            // shutdown_sender: None,
113            output_file_type: None,
114            metadata: None,
115        }
116    }
117
118    /// Adds an event to the `EventLogger`.
119    ///
120    /// # Arguments
121    ///
122    /// * `event` - The event to be added.
123    /// * `name` - The name of the event.
124    ///
125    /// # Returns
126    ///
127    /// The `EventLogger` instance with the added event.
128    pub fn with_event<S: Into<String>, D: EthLogDecode + Debug + Serialize + 'static>(
129        mut self,
130        event: Event<Arc<ArbiterMiddleware>, ArbiterMiddleware, D>,
131        name: S,
132    ) -> Self {
133        let name = name.into();
134        // Grab the connection from the client and add a new event sender so that we
135        // have a distinct channel to now receive events over
136        let event_transmuted: EventTransmuted<Arc<ArbiterMiddleware>, ArbiterMiddleware, D> =
137            unsafe { transmute(event) };
138        let middleware = event_transmuted.provider.clone();
139        let decoder = |x: &_| serde_json::to_string(&D::decode_log(x).unwrap()).unwrap();
140        let filter = event_transmuted.filter.clone();
141        self.decoder.insert(
142            name.clone(),
143            (FilteredParams::new(Some(filter)), Box::new(decoder)),
144        );
145        let connection = middleware.provider().as_ref();
146        if self.receiver.is_none() {
147            self.receiver = Some(connection.event_sender.subscribe());
148        }
149        debug!("`EventLogger` now provided with event labeled: {:?}", name);
150        self
151    }
152
153    /// Sets the directory for the `EventLogger`.
154    ///
155    /// # Arguments
156    ///
157    /// * `directory` - The directory where the event logs will be stored.
158    ///
159    /// # Returns
160    ///
161    /// The `EventLogger` instance with the specified directory.
162    pub fn directory<S: Into<String>>(mut self, path: S) -> Self {
163        let cwd = std::env::current_dir().unwrap();
164        let full_path = cwd.join(path.into());
165        self.directory = Some(full_path.to_str().unwrap().to_owned());
166        debug!("`EventLogger` output directory set to: {:?}", full_path);
167        self
168    }
169
170    /// Sets the output file name for the `EventLogger`.
171    ///
172    /// # Arguments
173    ///
174    /// * `file_name` - The file where the event logs will be stored.
175    ///
176    /// # Returns
177    ///
178    /// The `EventLogger` instance with the specified file.
179    pub fn file_name<S: Into<String>>(mut self, path: S) -> Self {
180        let path = path.into();
181        self.file_name = Some(path.clone());
182        debug!("`EventLogger` output file name set to: {:?}", path);
183        self
184    }
185
186    /// Sets the output file type for the `EventLogger`.
187    /// The default file type is JSON.
188    /// # Arguments
189    ///
190    /// * `file_type` - The file type that the event logs will be stored in.
191    ///
192    /// # Returns
193    ///
194    /// The `EventLogger` instance with the specified file type.
195    pub fn file_type(mut self, file_type: OutputFileType) -> Self {
196        self.output_file_type = Some(file_type);
197        self
198    }
199    /// Sets the metadata for the `EventLogger`.
200    ///
201    /// # Arguments
202    ///
203    /// * `metadata` - The metadata to be stored with the event logs which must
204    ///   implement the `Serialize` trait.
205    ///
206    /// # Returns
207    ///
208    /// The `EventLogger` instance with the specified metadata.
209    pub fn metadata(mut self, metadata: impl Serialize) -> Result<Self, serde_json::Error> {
210        let metadata = serde_json::to_value(metadata)?;
211        self.metadata = Some(metadata);
212        debug!("`EventLogger` metadata provided");
213        Ok(self)
214    }
215
216    /// Executes the `EventLogger`.
217    ///
218    /// This function starts the event logging process. It first deletes the
219    /// existing events directory, then creates a new directory for each
220    /// event. For each event, it creates a new CSV file and writes
221    /// the event data into the file. If the file already exists, it appends the
222    /// new data to the file.
223    ///
224    /// # Returns
225    ///
226    /// A `Result` which is:
227    ///
228    /// * `Ok(())` if the `EventLogger` ran successfully.
229    /// * `Err(RevmMiddlewareError)` if there was an error running the
230    ///   `EventLogger`.
231    ///
232    /// # Errors
233    ///
234    /// This function will return an error if there is a problem creating the
235    /// directories or files, or writing to the files.
236    pub fn run(self) -> Result<JoinHandle<()>, ArbiterCoreError> {
237        let mut receiver = self.receiver.unwrap();
238        let dir = self.directory.unwrap_or("./data".into());
239        let file_name = self.file_name.unwrap_or("output".into());
240        let file_type = self.output_file_type.unwrap_or(OutputFileType::JSON);
241        let metadata = self.metadata.clone();
242        let task = tokio::spawn(async move {
243            let mut events: BTreeMap<String, BTreeMap<String, Vec<Value>>> = BTreeMap::new();
244            while let Ok(broadcast) = receiver.recv().await {
245                match broadcast {
246                    Broadcast::StopSignal => {
247                        debug!("`EventLogger` has seen a stop signal");
248                        // create new directory with path
249                        let output_dir = std::env::current_dir().unwrap().join(dir);
250                        std::fs::create_dir_all(&output_dir).unwrap();
251                        let file_path = output_dir.join(format!("{}.json", file_name));
252                        debug!(
253                            "`EventLogger` dumping event data into: {:?}",
254                            file_path.to_str().unwrap().to_owned()
255                        );
256                        // match the file output type and write to correct file using the right file
257                        // type
258                        match file_type {
259                            OutputFileType::JSON => {
260                                let file_path = output_dir.join(format!("{}.json", file_name));
261                                let file = std::fs::File::create(file_path).unwrap();
262                                let writer = BufWriter::new(file);
263
264                                #[derive(Serialize, Clone)]
265                                struct OutputData<T> {
266                                    events: BTreeMap<String, BTreeMap<String, Vec<Value>>>,
267                                    metadata: Option<T>,
268                                }
269                                let data = OutputData { events, metadata };
270                                serde_json::to_writer(writer, &data).expect("Unable to write data");
271                            }
272                            OutputFileType::CSV => {
273                                // Write the DataFrame to a CSV file
274                                let mut df = flatten_to_data_frame(events);
275                                let file_path = output_dir.join(format!("{}.csv", file_name));
276                                let file = std::fs::File::create(file_path).unwrap_or_else(|_| {
277                                    panic!("Error creating csv file");
278                                });
279                                let mut writer = CsvWriter::new(file);
280                                writer.finish(&mut df).unwrap_or_else(|_| {
281                                    panic!("Error writing to csv file");
282                                });
283                            }
284                            OutputFileType::Parquet => {
285                                // Write the DataFrame to a parquet file
286                                let mut df = flatten_to_data_frame(events);
287                                let file_path = output_dir.join(format!("{}.parquet", file_name));
288                                let file = std::fs::File::create(file_path).unwrap_or_else(|_| {
289                                    panic!("Error creating parquet file");
290                                });
291                                let writer = ParquetWriter::new(file);
292                                writer.finish(&mut df).unwrap_or_else(|_| {
293                                    panic!("Error writing to parquet file");
294                                });
295                            }
296                        }
297                        break;
298                    }
299                    Broadcast::Event(event, receipt_data) => {
300                        trace!("`EventLogger` received an event");
301                        let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
302                        for log in ethers_logs {
303                            for (contract_name, (filter, decoder)) in self.decoder.iter() {
304                                if filter.filter_address(&log) && filter.filter_topics(&log) {
305                                    let cloned_logs = log.clone();
306                                    let event_as_value = serde_json::from_str::<Value>(&decoder(
307                                        &cloned_logs.into(),
308                                    ))
309                                    .unwrap();
310                                    let event_as_object = event_as_value.as_object().unwrap();
311
312                                    let contract = events.get(contract_name);
313                                    if contract.is_none() {
314                                        events.insert(contract_name.clone(), BTreeMap::new());
315                                    }
316                                    let contract = events.get_mut(contract_name).unwrap();
317
318                                    let event_name =
319                                        event_as_object.clone().keys().collect::<Vec<&String>>()[0]
320                                            .clone();
321
322                                    let event = contract.get_mut(&event_name);
323                                    if event.is_none() {
324                                        contract.insert(event_name.to_string(), vec![]);
325                                    }
326                                    let event = contract.get_mut(&event_name).unwrap();
327
328                                    for (_key, value) in event_as_object {
329                                        event.push(value.clone());
330                                    }
331                                    trace!(
332                                        "`EventLogger` successfully filtered and logged the event"
333                                    )
334                                }
335                            }
336                        }
337                    }
338                }
339            }
340        });
341        Ok(task)
342    }
343}
344
345fn flatten_to_data_frame(events: BTreeMap<String, BTreeMap<String, Vec<Value>>>) -> DataFrame {
346    // 1. Flatten the BTreeMap
347    let mut contract_names = Vec::new();
348    let mut event_names = Vec::new();
349    let mut event_values = Vec::new();
350
351    for (contract, events) in &events {
352        for (event, values) in events {
353            for value in values {
354                contract_names.push(contract.clone());
355                event_names.push(event.clone());
356                event_values.push(value.to_string());
357            }
358        }
359    }
360
361    // 2. Convert the vectors into a DataFrame
362    DataFrame::new(vec![
363        Series::new("contract_name", contract_names),
364        Series::new("event_name", event_names),
365        Series::new("event_value", event_values),
366    ])
367    .unwrap()
368}
369pub(crate) struct EventTransmuted<B, M, D> {
370    /// The event filter's state
371    pub filter: Filter,
372    pub(crate) provider: B,
373    /// Stores the event datatype
374    pub(crate) datatype: PhantomData<D>,
375    pub(crate) _m: PhantomData<M>,
376}
377
378/// Adds an event to the `EventLogger` and generates a unique ID for the
379/// event since we don't need to name events that are solely streamed and
380/// not stored.
381pub fn stream_event<D: EthLogDecode + Debug + Serialize + 'static>(
382    event: Event<Arc<ArbiterMiddleware>, ArbiterMiddleware, D>,
383) -> Pin<Box<dyn Stream<Item = D> + Send + Sync>> {
384    let mut hasher = Sha256::new();
385    hasher.update(serde_json::to_string(&event.filter).unwrap());
386    let hash = hasher.finalize();
387    let id = hex::encode(hash);
388    let mut logger = Logger::builder().with_event(event, id);
389
390    if let Some(mut receiver) = logger.receiver.take() {
391        let stream = async_stream::stream! {
392            while let Ok(broadcast) = receiver.recv().await {
393                match broadcast {
394                    Broadcast::StopSignal => {
395                        trace!("`EventLogger` has seen a stop signal");
396                        break;
397                    }
398                    Broadcast::Event(event, receipt_data) => {
399                        trace!("`EventLogger` received an event");
400                        let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
401                        for log in &ethers_logs {
402                            for (_id, (filter, _)) in logger.decoder.iter() {
403                                if filter.filter_address(log) && filter.filter_topics(log) {
404                                    let raw_log = RawLog::from(log.clone());
405                                    yield D::decode_log(&raw_log).unwrap();
406                                }
407                            }
408                        }
409                    }
410                }
411            }
412        };
413        Box::pin(stream)
414    } else {
415        unreachable!()
416    }
417}