use std::sync::Weak;
use super::*;
use crate::environment::{InstructionSender, OutcomeReceiver, OutcomeSender};
#[derive(Debug)]
pub struct Connection {
pub(crate) instruction_sender: Weak<InstructionSender>,
pub(crate) outcome_sender: OutcomeSender,
pub(crate) outcome_receiver: OutcomeReceiver,
pub(crate) event_sender: BroadcastSender<Broadcast>,
pub(crate) filter_receivers: Arc<Mutex<HashMap<ethers::types::U256, FilterReceiver>>>,
}
impl From<&Environment> for Connection {
fn from(environment: &Environment) -> Self {
let instruction_sender = &Arc::clone(&environment.socket.instruction_sender);
let (outcome_sender, outcome_receiver) = crossbeam_channel::unbounded();
Self {
instruction_sender: Arc::downgrade(instruction_sender),
outcome_sender,
outcome_receiver,
event_sender: environment.socket.event_broadcaster.clone(),
filter_receivers: Arc::new(Mutex::new(HashMap::new())),
}
}
}
#[async_trait::async_trait]
impl JsonRpcClient for Connection {
type Error = ProviderError;
async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
&self,
method: &str,
params: T,
) -> Result<R, ProviderError> {
match method {
"eth_getFilterChanges" => {
trace!("Getting filter changes...");
let value = serde_json::to_value(¶ms)?;
let str = value.as_array().ok_or(ProviderError::CustomError(
"The params value passed to the `Connection` via a `request` was empty.
This is likely due to not specifying a specific `Filter` ID!".to_string()
))?[0]
.as_str().ok_or(ProviderError::CustomError(
"The params value passed to the `Connection` via a `request` could not be later cast to `str`!".to_string()
))?;
let id = ethers::types::U256::from_str_radix(str, 16)
.map_err(|e| ProviderError::CustomError(
format!("The `str` representation of the filter ID could not be cast into `U256` due to: {:?}!",
e)))?;
let mut filter_receivers = self.filter_receivers.lock().unwrap();
let filter_receiver =
filter_receivers
.get_mut(&id)
.ok_or(ProviderError::CustomError(
"The filter ID does not seem to match any that this client owns!"
.to_string(),
))?;
let mut logs = vec![];
let filtered_params = FilteredParams::new(Some(filter_receiver.filter.clone()));
if let Some(receiver) = filter_receiver.receiver.as_mut() {
if let Ok(broadcast) = receiver.try_recv() {
match broadcast {
Broadcast::Event(received_logs, receipt_data) => {
let ethers_logs =
revm_logs_to_ethers_logs(received_logs, &receipt_data);
for log in ethers_logs {
if filtered_params.filter_address(&log)
&& filtered_params.filter_topics(&log)
{
logs.push(log);
}
}
}
Broadcast::StopSignal => {
return Err(ProviderError::CustomError(
"The `EventBroadcaster` has stopped!".to_string(),
));
}
}
}
}
let logs_str = serde_json::to_string(&logs)?;
let logs_deserializeowned: R = serde_json::from_str(&logs_str)?;
Ok(logs_deserializeowned)
}
val => Err(ProviderError::CustomError(format!(
"The method `{}` is not supported by the `Connection`!",
val
))),
}
}
}
impl PubsubClient for Connection {
type NotificationStream = Pin<Box<dyn Stream<Item = Box<RawValue>> + Send>>;
fn subscribe<T: Into<ethers::types::U256>>(
&self,
id: T,
) -> Result<Self::NotificationStream, Self::Error> {
let id = id.into();
debug!("Subscribing to filter with ID: {:?}", id);
let mut filter_receiver = self
.filter_receivers
.lock()
.unwrap()
.remove(&id)
.take()
.unwrap();
let mut receiver = filter_receiver.receiver.take().unwrap();
let stream = async_stream::stream! {
while let Ok(broadcast) = receiver.recv().await {
match broadcast {
Broadcast::StopSignal => {
break;
}
Broadcast::Event(logs, receipt_data) => {
let filtered_params =
FilteredParams::new(Some(filter_receiver.filter.clone()));
let ethers_logs = revm_logs_to_ethers_logs(logs, &receipt_data);
for log in ethers_logs {
if filtered_params.filter_address(&log)
&& filtered_params.filter_topics(&log)
{
let raw_log = match serde_json::to_string(&log) {
Ok(log) => log,
Err(e) => {
eprintln!("Error serializing log: {}", e);
continue;
}
};
let raw_log = match RawValue::from_string(raw_log) {
Ok(log) => log,
Err(e) => {
eprintln!("Error creating RawValue: {}", e);
continue;
}
};
yield raw_log;
}
}
}
}
}
};
Ok(Box::pin(stream))
}
fn unsubscribe<T: Into<ethers::types::U256>>(&self, id: T) -> Result<(), Self::Error> {
let id = id.into();
debug!("Unsubscribing from filter with ID: {:?}", id);
if self.filter_receivers.lock().unwrap().remove(&id).is_some() {
Ok(())
} else {
Err(ProviderError::CustomError(
"The filter ID does not seem to match any that this client owns!".to_string(),
))
}
}
}
#[derive(Debug)]
pub(crate) struct FilterReceiver {
pub(crate) filter: Filter,
pub(crate) receiver: Option<BroadcastReceiver<Broadcast>>,
}
#[inline]
pub fn revm_logs_to_ethers_logs(revm_logs: Vec<Log>, receipt_data: &ReceiptData) -> Vec<eLog> {
let mut logs: Vec<eLog> = vec![];
for revm_log in revm_logs {
let topics = revm_log.topics().iter().map(recast_b256).collect();
let data = eBytes::from(revm_log.data.data.0);
let log = eLog {
address: eAddress::from(revm_log.address.into_array()),
topics,
data,
block_hash: Some(H256::default()),
block_number: Some(receipt_data.block_number),
transaction_hash: Some(H256::default()),
transaction_index: Some(receipt_data.transaction_index),
log_index: Some(eU256::from(0)),
transaction_log_index: None,
log_type: None,
removed: None,
};
logs.push(log);
}
logs
}
#[inline]
pub fn recast_b256(input: &revm::primitives::B256) -> ethers::types::H256 {
ethers::types::H256::from(input.0)
}