use std::{io::BufWriter, marker::PhantomData, mem::transmute, pin::Pin};
use ethers::{
abi::RawLog,
contract::{builders::Event, EthLogDecode},
core::k256::sha2::{Digest, Sha256},
providers::Middleware,
types::{Filter, FilteredParams},
};
use futures_util::Stream;
use polars::{
io::parquet::ParquetWriter,
prelude::{CsvWriter, DataFrame, NamedFrom, SerWriter},
series::Series,
};
use serde::Serialize;
use serde_json::Value;
use tokio::{sync::broadcast::Receiver as BroadcastReceiver, task::JoinHandle};
use super::*;
use crate::middleware::{connection::revm_logs_to_ethers_logs, ArbiterMiddleware};
pub(crate) type FilterDecoder =
BTreeMap<String, (FilteredParams, Box<dyn Fn(&RawLog) -> String + Send + Sync>)>;
pub struct Logger {
decoder: FilterDecoder,
receiver: Option<BroadcastReceiver<Broadcast>>,
output_file_type: Option<OutputFileType>,
directory: Option<String>,
file_name: Option<String>,
metadata: Option<Value>,
}
impl Debug for Logger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventLogger")
.field("receiver", &self.receiver)
.field("output_file_type", &self.output_file_type)
.field("directory", &self.directory)
.field("file_name", &self.file_name)
.field("metadata", &self.metadata)
.finish()
}
}
#[derive(Debug, Clone, Copy, Serialize)]
pub enum OutputFileType {
JSON,
CSV,
Parquet,
}
impl Logger {
pub fn builder() -> Self {
debug!("`EventLogger` initialized");
Self {
directory: None,
file_name: None,
decoder: BTreeMap::new(),
receiver: None,
output_file_type: None,
metadata: None,
}
}
pub fn with_event<S: Into<String>, D: EthLogDecode + Debug + Serialize + 'static>(
mut self,
event: Event<Arc<ArbiterMiddleware>, ArbiterMiddleware, D>,
name: S,
) -> Self {
let name = name.into();
let event_transmuted: EventTransmuted<Arc<ArbiterMiddleware>, ArbiterMiddleware, D> =
unsafe { transmute(event) };
let middleware = event_transmuted.provider.clone();
let decoder = |x: &_| serde_json::to_string(&D::decode_log(x).unwrap()).unwrap();
let filter = event_transmuted.filter.clone();
self.decoder.insert(
name.clone(),
(FilteredParams::new(Some(filter)), Box::new(decoder)),
);
let connection = middleware.provider().as_ref();
if self.receiver.is_none() {
self.receiver = Some(connection.event_sender.subscribe());
}
debug!("`EventLogger` now provided with event labeled: {:?}", name);
self
}
pub fn directory<S: Into<String>>(mut self, path: S) -> Self {
let cwd = std::env::current_dir().unwrap();
let full_path = cwd.join(path.into());
self.directory = Some(full_path.to_str().unwrap().to_owned());
debug!("`EventLogger` output directory set to: {:?}", full_path);
self
}
pub fn file_name<S: Into<String>>(mut self, path: S) -> Self {
let path = path.into();
self.file_name = Some(path.clone());
debug!("`EventLogger` output file name set to: {:?}", path);
self
}
pub fn file_type(mut self, file_type: OutputFileType) -> Self {
self.output_file_type = Some(file_type);
self
}
pub fn metadata(mut self, metadata: impl Serialize) -> Result<Self, serde_json::Error> {
let metadata = serde_json::to_value(metadata)?;
self.metadata = Some(metadata);
debug!("`EventLogger` metadata provided");
Ok(self)
}
pub fn run(self) -> Result<JoinHandle<()>, ArbiterCoreError> {
let mut receiver = self.receiver.unwrap();
let dir = self.directory.unwrap_or("./data".into());
let file_name = self.file_name.unwrap_or("output".into());
let file_type = self.output_file_type.unwrap_or(OutputFileType::JSON);
let metadata = self.metadata.clone();
let task = tokio::spawn(async move {
let mut events: BTreeMap<String, BTreeMap<String, Vec<Value>>> = BTreeMap::new();
while let Ok(broadcast) = receiver.recv().await {
match broadcast {
Broadcast::StopSignal => {
debug!("`EventLogger` has seen a stop signal");
let output_dir = std::env::current_dir().unwrap().join(dir);
std::fs::create_dir_all(&output_dir).unwrap();
let file_path = output_dir.join(format!("{}.json", file_name));
debug!(
"`EventLogger` dumping event data into: {:?}",
file_path.to_str().unwrap().to_owned()
);
match file_type {
OutputFileType::JSON => {
let file_path = output_dir.join(format!("{}.json", file_name));
let file = std::fs::File::create(file_path).unwrap();
let writer = BufWriter::new(file);
#[derive(Serialize, Clone)]
struct OutputData<T> {
events: BTreeMap<String, BTreeMap<String, Vec<Value>>>,
metadata: Option<T>,
}
let data = OutputData { events, metadata };
serde_json::to_writer(writer, &data).expect("Unable to write data");
}
OutputFileType::CSV => {
let mut df = flatten_to_data_frame(events);
let file_path = output_dir.join(format!("{}.csv", file_name));
let file = std::fs::File::create(file_path).unwrap_or_else(|_| {
panic!("Error creating csv file");
});
let mut writer = CsvWriter::new(file);
writer.finish(&mut df).unwrap_or_else(|_| {
panic!("Error writing to csv file");
});
}
OutputFileType::Parquet => {
let mut df = flatten_to_data_frame(events);
let file_path = output_dir.join(format!("{}.parquet", file_name));
let file = std::fs::File::create(file_path).unwrap_or_else(|_| {
panic!("Error creating parquet file");
});
let writer = ParquetWriter::new(file);
writer.finish(&mut df).unwrap_or_else(|_| {
panic!("Error writing to parquet file");
});
}
}
break;
}
Broadcast::Event(event, receipt_data) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
for log in ethers_logs {
for (contract_name, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
let cloned_logs = log.clone();
let event_as_value = serde_json::from_str::<Value>(&decoder(
&cloned_logs.into(),
))
.unwrap();
let event_as_object = event_as_value.as_object().unwrap();
let contract = events.get(contract_name);
if contract.is_none() {
events.insert(contract_name.clone(), BTreeMap::new());
}
let contract = events.get_mut(contract_name).unwrap();
let event_name =
event_as_object.clone().keys().collect::<Vec<&String>>()[0]
.clone();
let event = contract.get_mut(&event_name);
if event.is_none() {
contract.insert(event_name.to_string(), vec![]);
}
let event = contract.get_mut(&event_name).unwrap();
for (_key, value) in event_as_object {
event.push(value.clone());
}
trace!(
"`EventLogger` successfully filtered and logged the event"
)
}
}
}
}
}
}
});
Ok(task)
}
}
fn flatten_to_data_frame(events: BTreeMap<String, BTreeMap<String, Vec<Value>>>) -> DataFrame {
let mut contract_names = Vec::new();
let mut event_names = Vec::new();
let mut event_values = Vec::new();
for (contract, events) in &events {
for (event, values) in events {
for value in values {
contract_names.push(contract.clone());
event_names.push(event.clone());
event_values.push(value.to_string());
}
}
}
DataFrame::new(vec![
Series::new("contract_name", contract_names),
Series::new("event_name", event_names),
Series::new("event_value", event_values),
])
.unwrap()
}
pub(crate) struct EventTransmuted<B, M, D> {
pub filter: Filter,
pub(crate) provider: B,
pub(crate) datatype: PhantomData<D>,
pub(crate) _m: PhantomData<M>,
}
pub fn stream_event<D: EthLogDecode + Debug + Serialize + 'static>(
event: Event<Arc<ArbiterMiddleware>, ArbiterMiddleware, D>,
) -> Pin<Box<dyn Stream<Item = D> + Send + Sync>> {
let mut hasher = Sha256::new();
hasher.update(serde_json::to_string(&event.filter).unwrap());
let hash = hasher.finalize();
let id = hex::encode(hash);
let mut logger = Logger::builder().with_event(event, id);
if let Some(mut receiver) = logger.receiver.take() {
let stream = async_stream::stream! {
while let Ok(broadcast) = receiver.recv().await {
match broadcast {
Broadcast::StopSignal => {
trace!("`EventLogger` has seen a stop signal");
break;
}
Broadcast::Event(event, receipt_data) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
for log in ðers_logs {
for (_id, (filter, _)) in logger.decoder.iter() {
if filter.filter_address(log) && filter.filter_topics(log) {
let raw_log = RawLog::from(log.clone());
yield D::decode_log(&raw_log).unwrap();
}
}
}
}
}
}
};
Box::pin(stream)
} else {
unreachable!()
}
}