use std::collections::HashSet;
use std::sync::Mutex;
use serde_json::Value;
use crate::error::TransportError;
use crate::request::JsonRpcRequest;
use crate::transport::RpcTransport;
#[derive(Debug, Clone)]
pub struct PendingPoolConfig {
pub poll_interval_ms: u64,
pub max_monitored: usize,
}
impl Default for PendingPoolConfig {
fn default() -> Self {
Self {
poll_interval_ms: 2000,
max_monitored: 256,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PendingTxStatus {
Pending,
Included { block_number: u64 },
NotFound,
}
impl std::fmt::Display for PendingTxStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Included { block_number } => write!(f, "included(block={block_number})"),
Self::NotFound => write!(f, "not_found"),
}
}
}
pub struct PendingPoolMonitor {
config: PendingPoolConfig,
watched: Mutex<HashSet<String>>,
}
impl PendingPoolMonitor {
pub fn new(config: PendingPoolConfig) -> Self {
Self {
config,
watched: Mutex::new(HashSet::new()),
}
}
pub fn watch(&self, tx_hash: String) -> bool {
let mut watched = self.watched.lock().unwrap();
if watched.len() >= self.config.max_monitored {
return false;
}
watched.insert(tx_hash)
}
pub fn unwatch(&self, tx_hash: &str) {
let mut watched = self.watched.lock().unwrap();
watched.remove(tx_hash);
}
pub fn watched(&self) -> Vec<String> {
let watched = self.watched.lock().unwrap();
watched.iter().cloned().collect()
}
pub fn count(&self) -> usize {
let watched = self.watched.lock().unwrap();
watched.len()
}
pub fn poll_interval_ms(&self) -> u64 {
self.config.poll_interval_ms
}
pub async fn check_status(
transport: &dyn RpcTransport,
tx_hash: &str,
) -> Result<PendingTxStatus, TransportError> {
let receipt_req = JsonRpcRequest::auto(
"eth_getTransactionReceipt",
vec![Value::String(tx_hash.to_string())],
);
let receipt_resp = transport.send(receipt_req).await?;
let receipt_value = receipt_resp.into_result().map_err(TransportError::Rpc)?;
if !receipt_value.is_null() {
if let Some(block_hex) = receipt_value.get("blockNumber").and_then(|v| v.as_str()) {
let block_number =
u64::from_str_radix(block_hex.trim_start_matches("0x"), 16).unwrap_or(0);
return Ok(PendingTxStatus::Included { block_number });
}
return Ok(PendingTxStatus::Included { block_number: 0 });
}
let tx_req = JsonRpcRequest::auto(
"eth_getTransactionByHash",
vec![Value::String(tx_hash.to_string())],
);
let tx_resp = transport.send(tx_req).await?;
let tx_value = tx_resp.into_result().map_err(TransportError::Rpc)?;
if tx_value.is_null() {
Ok(PendingTxStatus::NotFound)
} else {
Ok(PendingTxStatus::Pending)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn monitor_watch_unwatch() {
let monitor = PendingPoolMonitor::new(PendingPoolConfig::default());
assert!(monitor.watch("0xabc".to_string()));
assert_eq!(monitor.count(), 1);
assert!(!monitor.watch("0xabc".to_string()));
assert_eq!(monitor.count(), 1);
assert!(monitor.watch("0xdef".to_string()));
assert_eq!(monitor.count(), 2);
monitor.unwatch("0xabc");
assert_eq!(monitor.count(), 1);
let list = monitor.watched();
assert_eq!(list.len(), 1);
assert!(list.contains(&"0xdef".to_string()));
}
#[test]
fn monitor_max_capacity() {
let config = PendingPoolConfig {
poll_interval_ms: 1000,
max_monitored: 2,
};
let monitor = PendingPoolMonitor::new(config);
assert!(monitor.watch("0x1".to_string()));
assert!(monitor.watch("0x2".to_string()));
assert!(!monitor.watch("0x3".to_string()));
assert_eq!(monitor.count(), 2);
monitor.unwatch("0x1");
assert!(monitor.watch("0x3".to_string()));
assert_eq!(monitor.count(), 2);
}
#[test]
fn pending_status_enum() {
let pending = PendingTxStatus::Pending;
assert_eq!(pending.to_string(), "pending");
let included = PendingTxStatus::Included { block_number: 42 };
assert_eq!(included.to_string(), "included(block=42)");
let not_found = PendingTxStatus::NotFound;
assert_eq!(not_found.to_string(), "not_found");
assert_eq!(PendingTxStatus::Pending, PendingTxStatus::Pending);
assert_ne!(PendingTxStatus::Pending, PendingTxStatus::NotFound);
assert_eq!(
PendingTxStatus::Included { block_number: 10 },
PendingTxStatus::Included { block_number: 10 },
);
assert_ne!(
PendingTxStatus::Included { block_number: 10 },
PendingTxStatus::Included { block_number: 20 },
);
}
}