1use std::collections::HashMap;
21use std::sync::Mutex;
22
23use serde::Serialize;
24use serde_json::Value;
25
26use crate::error::TransportError;
27use crate::request::JsonRpcRequest;
28use crate::transport::RpcTransport;
29
30#[derive(Debug, Clone)]
36pub struct ReorgConfig {
37 pub window_size: usize,
39 pub safe_depth: u64,
41 pub use_finalized_tag: bool,
43}
44
45impl Default for ReorgConfig {
46 fn default() -> Self {
47 Self {
48 window_size: 128,
49 safe_depth: 64,
50 use_finalized_tag: true,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Serialize)]
61pub struct ReorgEvent {
62 pub fork_block: u64,
64 pub depth: u64,
66 pub old_hash: String,
68 pub new_hash: String,
70 pub current_tip: u64,
72}
73
74pub struct ReorgDetector {
86 config: ReorgConfig,
87 window: Mutex<HashMap<u64, String>>,
89 last_tip: Mutex<Option<u64>>,
91 #[allow(clippy::type_complexity)]
93 callbacks: Mutex<Vec<Box<dyn Fn(&ReorgEvent) + Send + Sync>>>,
94 reorg_history: Mutex<Vec<ReorgEvent>>,
96}
97
98impl ReorgDetector {
99 pub fn new(config: ReorgConfig) -> Self {
101 Self {
102 config,
103 window: Mutex::new(HashMap::new()),
104 last_tip: Mutex::new(None),
105 callbacks: Mutex::new(Vec::new()),
106 reorg_history: Mutex::new(Vec::new()),
107 }
108 }
109
110 pub fn on_reorg<F>(&self, callback: F)
112 where
113 F: Fn(&ReorgEvent) + Send + Sync + 'static,
114 {
115 let mut callbacks = self.callbacks.lock().unwrap();
116 callbacks.push(Box::new(callback));
117 }
118
119 pub fn check_block(&self, block_number: u64, block_hash: &str) -> Option<ReorgEvent> {
123 let mut window = self.window.lock().unwrap();
124 let mut last_tip = self.last_tip.lock().unwrap();
125
126 if let Some(stored_hash) = window.get(&block_number) {
128 if stored_hash != block_hash {
129 let fork_block = block_number;
131 let depth = last_tip.unwrap_or(block_number) - fork_block + 1;
132
133 let event = ReorgEvent {
134 fork_block,
135 depth,
136 old_hash: stored_hash.clone(),
137 new_hash: block_hash.to_string(),
138 current_tip: block_number,
139 };
140
141 let affected: Vec<u64> = window
143 .keys()
144 .filter(|&&n| n >= fork_block)
145 .copied()
146 .collect();
147 for n in affected {
148 window.remove(&n);
149 }
150
151 window.insert(block_number, block_hash.to_string());
153 *last_tip = Some(block_number);
154
155 Self::trim_window_inner(&self.config, &mut window, block_number);
157
158 let callbacks = self.callbacks.lock().unwrap();
160 for cb in callbacks.iter() {
161 cb(&event);
162 }
163
164 self.reorg_history.lock().unwrap().push(event.clone());
166
167 return Some(event);
168 }
169 }
170
171 window.insert(block_number, block_hash.to_string());
173 *last_tip = Some(block_number);
174 Self::trim_window_inner(&self.config, &mut window, block_number);
175
176 None
177 }
178
179 fn trim_window_inner(
181 config: &ReorgConfig,
182 window: &mut HashMap<u64, String>,
183 current_tip: u64,
184 ) {
185 if current_tip >= config.window_size as u64 {
186 let cutoff = current_tip - config.window_size as u64;
187 window.retain(|&n, _| n > cutoff);
188 }
189 }
190
191 pub async fn fetch_block_hash(
193 transport: &dyn RpcTransport,
194 block_number: u64,
195 ) -> Result<Option<String>, TransportError> {
196 let hex_block = format!("0x{:x}", block_number);
197 let req = JsonRpcRequest::auto(
198 "eth_getBlockByNumber",
199 vec![Value::String(hex_block), Value::Bool(false)],
200 );
201 let resp = transport.send(req).await?;
202 let value = resp.into_result().map_err(TransportError::Rpc)?;
203
204 Ok(value
205 .get("hash")
206 .and_then(|h| h.as_str())
207 .map(|s| s.to_string()))
208 }
209
210 pub async fn poll_and_check(
218 &self,
219 transport: &dyn RpcTransport,
220 ) -> Result<Option<ReorgEvent>, TransportError> {
221 let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
223 let resp = transport.send(req).await?;
224 let value = resp.into_result().map_err(TransportError::Rpc)?;
225
226 let block_number = value
227 .as_str()
228 .and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
229 .ok_or_else(|| TransportError::Other("invalid eth_blockNumber response".into()))?;
230
231 let hash = Self::fetch_block_hash(transport, block_number)
233 .await?
234 .ok_or_else(|| TransportError::Other("block not found".into()))?;
235
236 Ok(self.check_block(block_number, &hash))
237 }
238
239 pub fn safe_block(&self) -> Option<u64> {
241 let tip = self.last_tip.lock().unwrap();
242 tip.and_then(|t| t.checked_sub(self.config.safe_depth))
243 }
244
245 pub async fn fetch_finalized_block(
247 transport: &dyn RpcTransport,
248 ) -> Result<u64, TransportError> {
249 let req = JsonRpcRequest::auto(
250 "eth_getBlockByNumber",
251 vec![Value::String("finalized".into()), Value::Bool(false)],
252 );
253 let resp = transport.send(req).await?;
254 let value = resp.into_result().map_err(TransportError::Rpc)?;
255
256 value
257 .get("number")
258 .and_then(|n| n.as_str())
259 .and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
260 .ok_or_else(|| TransportError::Other("invalid finalized block response".into()))
261 }
262
263 pub fn reorg_history(&self) -> Vec<ReorgEvent> {
265 self.reorg_history.lock().unwrap().clone()
266 }
267
268 pub fn window_size(&self) -> usize {
270 self.window.lock().unwrap().len()
271 }
272
273 pub fn is_block_safe(&self, block_number: u64) -> bool {
275 self.safe_block().is_some_and(|safe| block_number <= safe)
276 }
277}
278
279#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::request::{JsonRpcResponse, RpcId};
287 use async_trait::async_trait;
288 use std::sync::atomic::{AtomicU32, Ordering};
289 use std::sync::Arc;
290
291 struct MockTransport {
296 responses: Mutex<HashMap<String, Value>>,
297 }
298
299 impl MockTransport {
300 fn new() -> Self {
301 Self {
302 responses: Mutex::new(HashMap::new()),
303 }
304 }
305
306 fn set_response(&self, method: &str, value: Value) {
307 let mut map = self.responses.lock().unwrap();
308 map.insert(method.to_string(), value);
309 }
310 }
311
312 #[async_trait]
313 impl RpcTransport for MockTransport {
314 async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
315 let map = self.responses.lock().unwrap();
316 let result = map.get(&req.method).cloned().unwrap_or(Value::Null);
317 Ok(JsonRpcResponse {
318 jsonrpc: "2.0".into(),
319 id: RpcId::Number(1),
320 result: Some(result),
321 error: None,
322 })
323 }
324
325 fn url(&self) -> &str {
326 "mock://reorg"
327 }
328 }
329
330 #[test]
335 fn no_reorg_sequential_blocks() {
336 let detector = ReorgDetector::new(ReorgConfig::default());
337
338 for i in 100..110 {
340 let hash = format!("0xhash_{i}");
341 let result = detector.check_block(i, &hash);
342 assert!(result.is_none(), "block {i} should not trigger reorg");
343 }
344
345 assert_eq!(detector.window_size(), 10);
346 assert!(detector.reorg_history().is_empty());
347 }
348
349 #[test]
350 fn detect_simple_reorg() {
351 let detector = ReorgDetector::new(ReorgConfig::default());
352
353 assert!(detector.check_block(100, "0xhash_A").is_none());
355
356 let event = detector
358 .check_block(100, "0xhash_B")
359 .expect("should detect reorg");
360
361 assert_eq!(event.fork_block, 100);
362 assert_eq!(event.old_hash, "0xhash_A");
363 assert_eq!(event.new_hash, "0xhash_B");
364 }
365
366 #[test]
367 fn reorg_event_has_correct_fields() {
368 let detector = ReorgDetector::new(ReorgConfig::default());
369
370 detector.check_block(100, "0xA100");
372 detector.check_block(101, "0xA101");
373 detector.check_block(102, "0xA102");
374
375 let event = detector
377 .check_block(101, "0xB101")
378 .expect("should detect reorg");
379
380 assert_eq!(event.fork_block, 101);
381 assert_eq!(event.depth, 2);
382 assert_eq!(event.old_hash, "0xA101");
383 assert_eq!(event.new_hash, "0xB101");
384 assert_eq!(event.current_tip, 101);
385 }
386
387 #[test]
388 fn window_trims_old_blocks() {
389 let config = ReorgConfig {
390 window_size: 5,
391 ..Default::default()
392 };
393 let detector = ReorgDetector::new(config);
394
395 for i in 1..=10 {
397 detector.check_block(i, &format!("0xhash_{i}"));
398 }
399
400 assert_eq!(detector.window_size(), 5);
403
404 assert!(detector.check_block(3, "0xdifferent").is_none());
407 }
408
409 #[test]
410 fn callback_fires_on_reorg() {
411 let detector = ReorgDetector::new(ReorgConfig::default());
412
413 let call_count = Arc::new(AtomicU32::new(0));
414 let count_clone = call_count.clone();
415
416 detector.on_reorg(move |_event| {
417 count_clone.fetch_add(1, Ordering::SeqCst);
418 });
419
420 detector.check_block(100, "0xhash_A");
422 detector.check_block(100, "0xhash_B");
423
424 assert_eq!(call_count.load(Ordering::SeqCst), 1);
425 }
426
427 #[test]
428 fn multiple_callbacks() {
429 let detector = ReorgDetector::new(ReorgConfig::default());
430
431 let count1 = Arc::new(AtomicU32::new(0));
432 let count2 = Arc::new(AtomicU32::new(0));
433 let c1 = count1.clone();
434 let c2 = count2.clone();
435
436 detector.on_reorg(move |_| {
437 c1.fetch_add(1, Ordering::SeqCst);
438 });
439 detector.on_reorg(move |_| {
440 c2.fetch_add(1, Ordering::SeqCst);
441 });
442
443 detector.check_block(100, "0xA");
444 detector.check_block(100, "0xB");
445
446 assert_eq!(count1.load(Ordering::SeqCst), 1);
447 assert_eq!(count2.load(Ordering::SeqCst), 1);
448 }
449
450 #[test]
451 fn reorg_history_recorded() {
452 let detector = ReorgDetector::new(ReorgConfig::default());
453
454 assert!(detector.reorg_history().is_empty());
455
456 detector.check_block(100, "0xA");
458 detector.check_block(100, "0xB");
459
460 detector.check_block(200, "0xC");
462 detector.check_block(200, "0xD");
463
464 let history = detector.reorg_history();
465 assert_eq!(history.len(), 2);
466 assert_eq!(history[0].fork_block, 100);
467 assert_eq!(history[1].fork_block, 200);
468 }
469
470 #[test]
471 fn safe_block_calculation() {
472 let config = ReorgConfig {
473 safe_depth: 10,
474 ..Default::default()
475 };
476 let detector = ReorgDetector::new(config);
477
478 assert!(detector.safe_block().is_none());
480
481 detector.check_block(100, "0xhash");
483 assert_eq!(detector.safe_block(), Some(90));
484
485 detector.check_block(150, "0xhash_150");
487 assert_eq!(detector.safe_block(), Some(140));
488 }
489
490 #[test]
491 fn safe_block_returns_none_when_tip_below_depth() {
492 let config = ReorgConfig {
493 safe_depth: 100,
494 ..Default::default()
495 };
496 let detector = ReorgDetector::new(config);
497
498 detector.check_block(50, "0xhash");
500 assert!(detector.safe_block().is_none());
501 }
502
503 #[test]
504 fn is_block_safe_checks_depth() {
505 let config = ReorgConfig {
506 safe_depth: 10,
507 ..Default::default()
508 };
509 let detector = ReorgDetector::new(config);
510
511 detector.check_block(100, "0xhash");
512 assert!(detector.is_block_safe(80)); assert!(detector.is_block_safe(90)); assert!(!detector.is_block_safe(91)); assert!(!detector.is_block_safe(100)); }
519
520 #[test]
521 fn is_block_safe_false_without_tip() {
522 let detector = ReorgDetector::new(ReorgConfig::default());
523 assert!(!detector.is_block_safe(0));
524 assert!(!detector.is_block_safe(100));
525 }
526
527 #[test]
528 fn reorg_clears_affected_blocks() {
529 let detector = ReorgDetector::new(ReorgConfig::default());
530
531 detector.check_block(100, "0xA100");
533 detector.check_block(101, "0xA101");
534 detector.check_block(102, "0xA102");
535 detector.check_block(103, "0xA103");
536 assert_eq!(detector.window_size(), 4);
537
538 let event = detector
541 .check_block(101, "0xB101")
542 .expect("should detect reorg");
543 assert_eq!(event.fork_block, 101);
544
545 assert_eq!(detector.window_size(), 2);
548
549 assert!(detector.check_block(102, "0xB102").is_none());
552 assert_eq!(detector.window_size(), 3);
553 }
554
555 #[tokio::test]
556 async fn poll_and_check_works() {
557 let transport = MockTransport::new();
558
559 transport.set_response(
561 "eth_blockNumber",
562 Value::String("0x64".into()), );
564 transport.set_response(
565 "eth_getBlockByNumber",
566 serde_json::json!({
567 "number": "0x64",
568 "hash": "0xblock_hash_100"
569 }),
570 );
571
572 let detector = ReorgDetector::new(ReorgConfig::default());
573
574 let result = detector.poll_and_check(&transport).await;
576 assert!(result.is_ok());
577 assert!(result.unwrap().is_none());
578 assert_eq!(detector.window_size(), 1);
579
580 let result = detector.poll_and_check(&transport).await;
582 assert!(result.is_ok());
583 assert!(result.unwrap().is_none());
584
585 transport.set_response(
587 "eth_getBlockByNumber",
588 serde_json::json!({
589 "number": "0x64",
590 "hash": "0xreorged_hash_100"
591 }),
592 );
593
594 let result = detector.poll_and_check(&transport).await;
596 assert!(result.is_ok());
597 let event = result.unwrap().expect("should detect reorg");
598 assert_eq!(event.fork_block, 100);
599 assert_eq!(event.old_hash, "0xblock_hash_100");
600 assert_eq!(event.new_hash, "0xreorged_hash_100");
601 }
602
603 #[tokio::test]
604 async fn fetch_block_hash_works() {
605 let transport = MockTransport::new();
606 transport.set_response(
607 "eth_getBlockByNumber",
608 serde_json::json!({
609 "number": "0xc8",
610 "hash": "0xblock_hash_200"
611 }),
612 );
613
614 let hash = ReorgDetector::fetch_block_hash(&transport, 200).await;
615 assert!(hash.is_ok());
616 assert_eq!(hash.unwrap(), Some("0xblock_hash_200".to_string()));
617 }
618
619 #[tokio::test]
620 async fn fetch_block_hash_returns_none_for_null_hash() {
621 let transport = MockTransport::new();
622 transport.set_response(
623 "eth_getBlockByNumber",
624 serde_json::json!({
625 "number": "0xc8"
626 }),
628 );
629
630 let hash = ReorgDetector::fetch_block_hash(&transport, 200).await;
631 assert!(hash.is_ok());
632 assert!(hash.unwrap().is_none());
633 }
634
635 #[tokio::test]
636 async fn fetch_finalized_block_works() {
637 let transport = MockTransport::new();
638 transport.set_response(
639 "eth_getBlockByNumber",
640 serde_json::json!({
641 "number": "0x1f4",
642 "hash": "0xfinalized_hash"
643 }),
644 );
645
646 let block = ReorgDetector::fetch_finalized_block(&transport).await;
647 assert!(block.is_ok());
648 assert_eq!(block.unwrap(), 500); }
650
651 #[test]
652 fn reorg_event_serializable() {
653 let event = ReorgEvent {
654 fork_block: 100,
655 depth: 3,
656 old_hash: "0xold".into(),
657 new_hash: "0xnew".into(),
658 current_tip: 102,
659 };
660
661 let json = serde_json::to_string(&event).unwrap();
662 assert!(json.contains("fork_block"));
663 assert!(json.contains("100"));
664 assert!(json.contains("0xold"));
665 assert!(json.contains("0xnew"));
666 }
667
668 #[test]
669 fn default_config_values() {
670 let config = ReorgConfig::default();
671 assert_eq!(config.window_size, 128);
672 assert_eq!(config.safe_depth, 64);
673 assert!(config.use_finalized_tag);
674 }
675}