use std::collections::HashMap;
use std::sync::Mutex;
use serde::Serialize;
use serde_json::Value;
use crate::error::TransportError;
use crate::request::JsonRpcRequest;
use crate::transport::RpcTransport;
#[derive(Debug, Clone)]
pub struct ReorgConfig {
pub window_size: usize,
pub safe_depth: u64,
pub use_finalized_tag: bool,
}
impl Default for ReorgConfig {
fn default() -> Self {
Self {
window_size: 128,
safe_depth: 64,
use_finalized_tag: true,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ReorgEvent {
pub fork_block: u64,
pub depth: u64,
pub old_hash: String,
pub new_hash: String,
pub current_tip: u64,
}
pub struct ReorgDetector {
config: ReorgConfig,
window: Mutex<HashMap<u64, String>>,
last_tip: Mutex<Option<u64>>,
#[allow(clippy::type_complexity)]
callbacks: Mutex<Vec<Box<dyn Fn(&ReorgEvent) + Send + Sync>>>,
reorg_history: Mutex<Vec<ReorgEvent>>,
}
impl ReorgDetector {
pub fn new(config: ReorgConfig) -> Self {
Self {
config,
window: Mutex::new(HashMap::new()),
last_tip: Mutex::new(None),
callbacks: Mutex::new(Vec::new()),
reorg_history: Mutex::new(Vec::new()),
}
}
pub fn on_reorg<F>(&self, callback: F)
where
F: Fn(&ReorgEvent) + Send + Sync + 'static,
{
let mut callbacks = self.callbacks.lock().unwrap();
callbacks.push(Box::new(callback));
}
pub fn check_block(&self, block_number: u64, block_hash: &str) -> Option<ReorgEvent> {
let mut window = self.window.lock().unwrap();
let mut last_tip = self.last_tip.lock().unwrap();
if let Some(stored_hash) = window.get(&block_number) {
if stored_hash != block_hash {
let fork_block = block_number;
let depth = last_tip.unwrap_or(block_number) - fork_block + 1;
let event = ReorgEvent {
fork_block,
depth,
old_hash: stored_hash.clone(),
new_hash: block_hash.to_string(),
current_tip: block_number,
};
let affected: Vec<u64> = window
.keys()
.filter(|&&n| n >= fork_block)
.copied()
.collect();
for n in affected {
window.remove(&n);
}
window.insert(block_number, block_hash.to_string());
*last_tip = Some(block_number);
Self::trim_window_inner(&self.config, &mut window, block_number);
let callbacks = self.callbacks.lock().unwrap();
for cb in callbacks.iter() {
cb(&event);
}
self.reorg_history.lock().unwrap().push(event.clone());
return Some(event);
}
}
window.insert(block_number, block_hash.to_string());
*last_tip = Some(block_number);
Self::trim_window_inner(&self.config, &mut window, block_number);
None
}
fn trim_window_inner(
config: &ReorgConfig,
window: &mut HashMap<u64, String>,
current_tip: u64,
) {
if current_tip >= config.window_size as u64 {
let cutoff = current_tip - config.window_size as u64;
window.retain(|&n, _| n > cutoff);
}
}
pub async fn fetch_block_hash(
transport: &dyn RpcTransport,
block_number: u64,
) -> Result<Option<String>, TransportError> {
let hex_block = format!("0x{:x}", block_number);
let req = JsonRpcRequest::auto(
"eth_getBlockByNumber",
vec![Value::String(hex_block), Value::Bool(false)],
);
let resp = transport.send(req).await?;
let value = resp.into_result().map_err(TransportError::Rpc)?;
Ok(value
.get("hash")
.and_then(|h| h.as_str())
.map(|s| s.to_string()))
}
pub async fn poll_and_check(
&self,
transport: &dyn RpcTransport,
) -> Result<Option<ReorgEvent>, TransportError> {
let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
let resp = transport.send(req).await?;
let value = resp.into_result().map_err(TransportError::Rpc)?;
let block_number = value
.as_str()
.and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
.ok_or_else(|| TransportError::Other("invalid eth_blockNumber response".into()))?;
let hash = Self::fetch_block_hash(transport, block_number)
.await?
.ok_or_else(|| TransportError::Other("block not found".into()))?;
Ok(self.check_block(block_number, &hash))
}
pub fn safe_block(&self) -> Option<u64> {
let tip = self.last_tip.lock().unwrap();
tip.and_then(|t| t.checked_sub(self.config.safe_depth))
}
pub async fn fetch_finalized_block(
transport: &dyn RpcTransport,
) -> Result<u64, TransportError> {
let req = JsonRpcRequest::auto(
"eth_getBlockByNumber",
vec![Value::String("finalized".into()), Value::Bool(false)],
);
let resp = transport.send(req).await?;
let value = resp.into_result().map_err(TransportError::Rpc)?;
value
.get("number")
.and_then(|n| n.as_str())
.and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
.ok_or_else(|| TransportError::Other("invalid finalized block response".into()))
}
pub fn reorg_history(&self) -> Vec<ReorgEvent> {
self.reorg_history.lock().unwrap().clone()
}
pub fn window_size(&self) -> usize {
self.window.lock().unwrap().len()
}
pub fn is_block_safe(&self, block_number: u64) -> bool {
self.safe_block().is_some_and(|safe| block_number <= safe)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::request::{JsonRpcResponse, RpcId};
use async_trait::async_trait;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
struct MockTransport {
responses: Mutex<HashMap<String, Value>>,
}
impl MockTransport {
fn new() -> Self {
Self {
responses: Mutex::new(HashMap::new()),
}
}
fn set_response(&self, method: &str, value: Value) {
let mut map = self.responses.lock().unwrap();
map.insert(method.to_string(), value);
}
}
#[async_trait]
impl RpcTransport for MockTransport {
async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
let map = self.responses.lock().unwrap();
let result = map.get(&req.method).cloned().unwrap_or(Value::Null);
Ok(JsonRpcResponse {
jsonrpc: "2.0".into(),
id: RpcId::Number(1),
result: Some(result),
error: None,
})
}
fn url(&self) -> &str {
"mock://reorg"
}
}
#[test]
fn no_reorg_sequential_blocks() {
let detector = ReorgDetector::new(ReorgConfig::default());
for i in 100..110 {
let hash = format!("0xhash_{i}");
let result = detector.check_block(i, &hash);
assert!(result.is_none(), "block {i} should not trigger reorg");
}
assert_eq!(detector.window_size(), 10);
assert!(detector.reorg_history().is_empty());
}
#[test]
fn detect_simple_reorg() {
let detector = ReorgDetector::new(ReorgConfig::default());
assert!(detector.check_block(100, "0xhash_A").is_none());
let event = detector
.check_block(100, "0xhash_B")
.expect("should detect reorg");
assert_eq!(event.fork_block, 100);
assert_eq!(event.old_hash, "0xhash_A");
assert_eq!(event.new_hash, "0xhash_B");
}
#[test]
fn reorg_event_has_correct_fields() {
let detector = ReorgDetector::new(ReorgConfig::default());
detector.check_block(100, "0xA100");
detector.check_block(101, "0xA101");
detector.check_block(102, "0xA102");
let event = detector
.check_block(101, "0xB101")
.expect("should detect reorg");
assert_eq!(event.fork_block, 101);
assert_eq!(event.depth, 2);
assert_eq!(event.old_hash, "0xA101");
assert_eq!(event.new_hash, "0xB101");
assert_eq!(event.current_tip, 101);
}
#[test]
fn window_trims_old_blocks() {
let config = ReorgConfig {
window_size: 5,
..Default::default()
};
let detector = ReorgDetector::new(config);
for i in 1..=10 {
detector.check_block(i, &format!("0xhash_{i}"));
}
assert_eq!(detector.window_size(), 5);
assert!(detector.check_block(3, "0xdifferent").is_none());
}
#[test]
fn callback_fires_on_reorg() {
let detector = ReorgDetector::new(ReorgConfig::default());
let call_count = Arc::new(AtomicU32::new(0));
let count_clone = call_count.clone();
detector.on_reorg(move |_event| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
detector.check_block(100, "0xhash_A");
detector.check_block(100, "0xhash_B");
assert_eq!(call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn multiple_callbacks() {
let detector = ReorgDetector::new(ReorgConfig::default());
let count1 = Arc::new(AtomicU32::new(0));
let count2 = Arc::new(AtomicU32::new(0));
let c1 = count1.clone();
let c2 = count2.clone();
detector.on_reorg(move |_| {
c1.fetch_add(1, Ordering::SeqCst);
});
detector.on_reorg(move |_| {
c2.fetch_add(1, Ordering::SeqCst);
});
detector.check_block(100, "0xA");
detector.check_block(100, "0xB");
assert_eq!(count1.load(Ordering::SeqCst), 1);
assert_eq!(count2.load(Ordering::SeqCst), 1);
}
#[test]
fn reorg_history_recorded() {
let detector = ReorgDetector::new(ReorgConfig::default());
assert!(detector.reorg_history().is_empty());
detector.check_block(100, "0xA");
detector.check_block(100, "0xB");
detector.check_block(200, "0xC");
detector.check_block(200, "0xD");
let history = detector.reorg_history();
assert_eq!(history.len(), 2);
assert_eq!(history[0].fork_block, 100);
assert_eq!(history[1].fork_block, 200);
}
#[test]
fn safe_block_calculation() {
let config = ReorgConfig {
safe_depth: 10,
..Default::default()
};
let detector = ReorgDetector::new(config);
assert!(detector.safe_block().is_none());
detector.check_block(100, "0xhash");
assert_eq!(detector.safe_block(), Some(90));
detector.check_block(150, "0xhash_150");
assert_eq!(detector.safe_block(), Some(140));
}
#[test]
fn safe_block_returns_none_when_tip_below_depth() {
let config = ReorgConfig {
safe_depth: 100,
..Default::default()
};
let detector = ReorgDetector::new(config);
detector.check_block(50, "0xhash");
assert!(detector.safe_block().is_none());
}
#[test]
fn is_block_safe_checks_depth() {
let config = ReorgConfig {
safe_depth: 10,
..Default::default()
};
let detector = ReorgDetector::new(config);
detector.check_block(100, "0xhash");
assert!(detector.is_block_safe(80)); assert!(detector.is_block_safe(90)); assert!(!detector.is_block_safe(91)); assert!(!detector.is_block_safe(100)); }
#[test]
fn is_block_safe_false_without_tip() {
let detector = ReorgDetector::new(ReorgConfig::default());
assert!(!detector.is_block_safe(0));
assert!(!detector.is_block_safe(100));
}
#[test]
fn reorg_clears_affected_blocks() {
let detector = ReorgDetector::new(ReorgConfig::default());
detector.check_block(100, "0xA100");
detector.check_block(101, "0xA101");
detector.check_block(102, "0xA102");
detector.check_block(103, "0xA103");
assert_eq!(detector.window_size(), 4);
let event = detector
.check_block(101, "0xB101")
.expect("should detect reorg");
assert_eq!(event.fork_block, 101);
assert_eq!(detector.window_size(), 2);
assert!(detector.check_block(102, "0xB102").is_none());
assert_eq!(detector.window_size(), 3);
}
#[tokio::test]
async fn poll_and_check_works() {
let transport = MockTransport::new();
transport.set_response(
"eth_blockNumber",
Value::String("0x64".into()), );
transport.set_response(
"eth_getBlockByNumber",
serde_json::json!({
"number": "0x64",
"hash": "0xblock_hash_100"
}),
);
let detector = ReorgDetector::new(ReorgConfig::default());
let result = detector.poll_and_check(&transport).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
assert_eq!(detector.window_size(), 1);
let result = detector.poll_and_check(&transport).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
transport.set_response(
"eth_getBlockByNumber",
serde_json::json!({
"number": "0x64",
"hash": "0xreorged_hash_100"
}),
);
let result = detector.poll_and_check(&transport).await;
assert!(result.is_ok());
let event = result.unwrap().expect("should detect reorg");
assert_eq!(event.fork_block, 100);
assert_eq!(event.old_hash, "0xblock_hash_100");
assert_eq!(event.new_hash, "0xreorged_hash_100");
}
#[tokio::test]
async fn fetch_block_hash_works() {
let transport = MockTransport::new();
transport.set_response(
"eth_getBlockByNumber",
serde_json::json!({
"number": "0xc8",
"hash": "0xblock_hash_200"
}),
);
let hash = ReorgDetector::fetch_block_hash(&transport, 200).await;
assert!(hash.is_ok());
assert_eq!(hash.unwrap(), Some("0xblock_hash_200".to_string()));
}
#[tokio::test]
async fn fetch_block_hash_returns_none_for_null_hash() {
let transport = MockTransport::new();
transport.set_response(
"eth_getBlockByNumber",
serde_json::json!({
"number": "0xc8"
}),
);
let hash = ReorgDetector::fetch_block_hash(&transport, 200).await;
assert!(hash.is_ok());
assert!(hash.unwrap().is_none());
}
#[tokio::test]
async fn fetch_finalized_block_works() {
let transport = MockTransport::new();
transport.set_response(
"eth_getBlockByNumber",
serde_json::json!({
"number": "0x1f4",
"hash": "0xfinalized_hash"
}),
);
let block = ReorgDetector::fetch_finalized_block(&transport).await;
assert!(block.is_ok());
assert_eq!(block.unwrap(), 500); }
#[test]
fn reorg_event_serializable() {
let event = ReorgEvent {
fork_block: 100,
depth: 3,
old_hash: "0xold".into(),
new_hash: "0xnew".into(),
current_tip: 102,
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("fork_block"));
assert!(json.contains("100"));
assert!(json.contains("0xold"));
assert!(json.contains("0xnew"));
}
#[test]
fn default_config_values() {
let config = ReorgConfig::default();
assert_eq!(config.window_size, 128);
assert_eq!(config.safe_depth, 64);
assert!(config.use_finalized_tag);
}
}