Skip to main content

chainrpc_core/
reorg.rs

1//! Chain reorganization detection at the RPC transport layer.
2//!
3//! Monitors block hashes via a sliding window and detects when the chain
4//! has reorganized (a previously-seen block hash changes for the same height).
5//!
6//! When a reorg is detected, registered callbacks are invoked with the
7//! fork point (the lowest block number that changed).
8//!
9//! # Architecture
10//!
11//! The reorg detector maintains a fixed-size window of `(block_number, block_hash)`
12//! pairs. On each new block:
13//!
14//! 1. Fetch block by number via `eth_getBlockByNumber`
15//! 2. Compare hash against stored hash for that height
16//! 3. If mismatch -> walk backward to find the fork point
17//! 4. Invoke all registered `on_reorg` callbacks
18//! 5. Invalidate affected entries in the window
19
20use std::collections::HashMap;
21use std::sync::Mutex;
22
23use serde::Serialize;
24use serde_json::Value;
25
26use crate::error::TransportError;
27use crate::request::JsonRpcRequest;
28use crate::transport::RpcTransport;
29
30// ---------------------------------------------------------------------------
31// ReorgConfig
32// ---------------------------------------------------------------------------
33
34/// Configuration for the reorg detector.
35#[derive(Debug, Clone)]
36pub struct ReorgConfig {
37    /// Number of blocks to keep in the sliding window (default: 128).
38    pub window_size: usize,
39    /// Minimum block depth before considering a block "safe" (default: 64).
40    pub safe_depth: u64,
41    /// Whether to use "finalized" block tag for safe block (default: true).
42    pub use_finalized_tag: bool,
43}
44
45impl Default for ReorgConfig {
46    fn default() -> Self {
47        Self {
48            window_size: 128,
49            safe_depth: 64,
50            use_finalized_tag: true,
51        }
52    }
53}
54
55// ---------------------------------------------------------------------------
56// ReorgEvent
57// ---------------------------------------------------------------------------
58
59/// Information about a detected reorganization.
60#[derive(Debug, Clone, Serialize)]
61pub struct ReorgEvent {
62    /// The lowest block number that changed (fork point).
63    pub fork_block: u64,
64    /// The depth of the reorg (how many blocks were replaced).
65    pub depth: u64,
66    /// The old block hash at the fork point.
67    pub old_hash: String,
68    /// The new block hash at the fork point.
69    pub new_hash: String,
70    /// The current chain tip block number.
71    pub current_tip: u64,
72}
73
74// ---------------------------------------------------------------------------
75// ReorgDetector
76// ---------------------------------------------------------------------------
77
78/// Chain reorganization detector.
79///
80/// Maintains a sliding window of block hashes and detects when the chain
81/// reorganizes by comparing new block hashes against stored ones.
82///
83/// Thread-safe via interior `Mutex`es — suitable for shared access across
84/// Tokio tasks behind an `Arc`.
85pub struct ReorgDetector {
86    config: ReorgConfig,
87    /// Sliding window: block_number -> block_hash.
88    window: Mutex<HashMap<u64, String>>,
89    /// Last known tip.
90    last_tip: Mutex<Option<u64>>,
91    /// Registered callbacks -- called with ReorgEvent when reorg detected.
92    #[allow(clippy::type_complexity)]
93    callbacks: Mutex<Vec<Box<dyn Fn(&ReorgEvent) + Send + Sync>>>,
94    /// History of detected reorgs.
95    reorg_history: Mutex<Vec<ReorgEvent>>,
96}
97
98impl ReorgDetector {
99    /// Create a new reorg detector with the given configuration.
100    pub fn new(config: ReorgConfig) -> Self {
101        Self {
102            config,
103            window: Mutex::new(HashMap::new()),
104            last_tip: Mutex::new(None),
105            callbacks: Mutex::new(Vec::new()),
106            reorg_history: Mutex::new(Vec::new()),
107        }
108    }
109
110    /// Register a callback that fires on reorg detection.
111    pub fn on_reorg<F>(&self, callback: F)
112    where
113        F: Fn(&ReorgEvent) + Send + Sync + 'static,
114    {
115        let mut callbacks = self.callbacks.lock().unwrap();
116        callbacks.push(Box::new(callback));
117    }
118
119    /// Check a block against the window. Returns `Some(ReorgEvent)` if reorg detected.
120    ///
121    /// Call this with each new block as it arrives.
122    pub fn check_block(&self, block_number: u64, block_hash: &str) -> Option<ReorgEvent> {
123        let mut window = self.window.lock().unwrap();
124        let mut last_tip = self.last_tip.lock().unwrap();
125
126        // Check if we have a stored hash for this block number
127        if let Some(stored_hash) = window.get(&block_number) {
128            if stored_hash != block_hash {
129                // REORG DETECTED -- find fork point
130                let fork_block = block_number;
131                let depth = last_tip.unwrap_or(block_number) - fork_block + 1;
132
133                let event = ReorgEvent {
134                    fork_block,
135                    depth,
136                    old_hash: stored_hash.clone(),
137                    new_hash: block_hash.to_string(),
138                    current_tip: block_number,
139                };
140
141                // Invalidate affected blocks in window
142                let affected: Vec<u64> = window
143                    .keys()
144                    .filter(|&&n| n >= fork_block)
145                    .copied()
146                    .collect();
147                for n in affected {
148                    window.remove(&n);
149                }
150
151                // Store new block
152                window.insert(block_number, block_hash.to_string());
153                *last_tip = Some(block_number);
154
155                // Trim window
156                Self::trim_window_inner(&self.config, &mut window, block_number);
157
158                // Fire callbacks
159                let callbacks = self.callbacks.lock().unwrap();
160                for cb in callbacks.iter() {
161                    cb(&event);
162                }
163
164                // Store in history
165                self.reorg_history.lock().unwrap().push(event.clone());
166
167                return Some(event);
168            }
169        }
170
171        // No reorg -- store block and advance tip
172        window.insert(block_number, block_hash.to_string());
173        *last_tip = Some(block_number);
174        Self::trim_window_inner(&self.config, &mut window, block_number);
175
176        None
177    }
178
179    /// Trim blocks that fall outside the window.
180    fn trim_window_inner(
181        config: &ReorgConfig,
182        window: &mut HashMap<u64, String>,
183        current_tip: u64,
184    ) {
185        if current_tip >= config.window_size as u64 {
186            let cutoff = current_tip - config.window_size as u64;
187            window.retain(|&n, _| n > cutoff);
188        }
189    }
190
191    /// Query the transport for a block hash at a given height.
192    pub async fn fetch_block_hash(
193        transport: &dyn RpcTransport,
194        block_number: u64,
195    ) -> Result<Option<String>, TransportError> {
196        let hex_block = format!("0x{:x}", block_number);
197        let req = JsonRpcRequest::auto(
198            "eth_getBlockByNumber",
199            vec![Value::String(hex_block), Value::Bool(false)],
200        );
201        let resp = transport.send(req).await?;
202        let value = resp.into_result().map_err(TransportError::Rpc)?;
203
204        Ok(value
205            .get("hash")
206            .and_then(|h| h.as_str())
207            .map(|s| s.to_string()))
208    }
209
210    /// Poll the chain for new blocks and check for reorgs.
211    ///
212    /// 1. Gets the current block number
213    /// 2. Fetches the block hash
214    /// 3. Calls check_block
215    ///
216    /// Returns any detected ReorgEvent.
217    pub async fn poll_and_check(
218        &self,
219        transport: &dyn RpcTransport,
220    ) -> Result<Option<ReorgEvent>, TransportError> {
221        // Get current block number
222        let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
223        let resp = transport.send(req).await?;
224        let value = resp.into_result().map_err(TransportError::Rpc)?;
225
226        let block_number = value
227            .as_str()
228            .and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
229            .ok_or_else(|| TransportError::Other("invalid eth_blockNumber response".into()))?;
230
231        // Get block hash
232        let hash = Self::fetch_block_hash(transport, block_number)
233            .await?
234            .ok_or_else(|| TransportError::Other("block not found".into()))?;
235
236        Ok(self.check_block(block_number, &hash))
237    }
238
239    /// Get the safe block number (current tip - safe_depth).
240    pub fn safe_block(&self) -> Option<u64> {
241        let tip = self.last_tip.lock().unwrap();
242        tip.and_then(|t| t.checked_sub(self.config.safe_depth))
243    }
244
245    /// Get the finalized block from the chain.
246    pub async fn fetch_finalized_block(
247        transport: &dyn RpcTransport,
248    ) -> Result<u64, TransportError> {
249        let req = JsonRpcRequest::auto(
250            "eth_getBlockByNumber",
251            vec![Value::String("finalized".into()), Value::Bool(false)],
252        );
253        let resp = transport.send(req).await?;
254        let value = resp.into_result().map_err(TransportError::Rpc)?;
255
256        value
257            .get("number")
258            .and_then(|n| n.as_str())
259            .and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
260            .ok_or_else(|| TransportError::Other("invalid finalized block response".into()))
261    }
262
263    /// Get reorg history.
264    pub fn reorg_history(&self) -> Vec<ReorgEvent> {
265        self.reorg_history.lock().unwrap().clone()
266    }
267
268    /// Number of blocks currently in the window.
269    pub fn window_size(&self) -> usize {
270        self.window.lock().unwrap().len()
271    }
272
273    /// Check if a block number is in the safe zone (below safe_depth).
274    pub fn is_block_safe(&self, block_number: u64) -> bool {
275        self.safe_block().is_some_and(|safe| block_number <= safe)
276    }
277}
278
279// ===========================================================================
280// Tests
281// ===========================================================================
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::request::{JsonRpcResponse, RpcId};
287    use async_trait::async_trait;
288    use std::sync::atomic::{AtomicU32, Ordering};
289    use std::sync::Arc;
290
291    // -----------------------------------------------------------------------
292    // Mock transport
293    // -----------------------------------------------------------------------
294
295    struct MockTransport {
296        responses: Mutex<HashMap<String, Value>>,
297    }
298
299    impl MockTransport {
300        fn new() -> Self {
301            Self {
302                responses: Mutex::new(HashMap::new()),
303            }
304        }
305
306        fn set_response(&self, method: &str, value: Value) {
307            let mut map = self.responses.lock().unwrap();
308            map.insert(method.to_string(), value);
309        }
310    }
311
312    #[async_trait]
313    impl RpcTransport for MockTransport {
314        async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
315            let map = self.responses.lock().unwrap();
316            let result = map.get(&req.method).cloned().unwrap_or(Value::Null);
317            Ok(JsonRpcResponse {
318                jsonrpc: "2.0".into(),
319                id: RpcId::Number(1),
320                result: Some(result),
321                error: None,
322            })
323        }
324
325        fn url(&self) -> &str {
326            "mock://reorg"
327        }
328    }
329
330    // -----------------------------------------------------------------------
331    // Tests
332    // -----------------------------------------------------------------------
333
334    #[test]
335    fn no_reorg_sequential_blocks() {
336        let detector = ReorgDetector::new(ReorgConfig::default());
337
338        // Add sequential blocks with consistent hashes
339        for i in 100..110 {
340            let hash = format!("0xhash_{i}");
341            let result = detector.check_block(i, &hash);
342            assert!(result.is_none(), "block {i} should not trigger reorg");
343        }
344
345        assert_eq!(detector.window_size(), 10);
346        assert!(detector.reorg_history().is_empty());
347    }
348
349    #[test]
350    fn detect_simple_reorg() {
351        let detector = ReorgDetector::new(ReorgConfig::default());
352
353        // Add block 100 with hash A
354        assert!(detector.check_block(100, "0xhash_A").is_none());
355
356        // Same block 100 with different hash B -> REORG
357        let event = detector
358            .check_block(100, "0xhash_B")
359            .expect("should detect reorg");
360
361        assert_eq!(event.fork_block, 100);
362        assert_eq!(event.old_hash, "0xhash_A");
363        assert_eq!(event.new_hash, "0xhash_B");
364    }
365
366    #[test]
367    fn reorg_event_has_correct_fields() {
368        let detector = ReorgDetector::new(ReorgConfig::default());
369
370        // Build a chain: blocks 100, 101, 102
371        detector.check_block(100, "0xA100");
372        detector.check_block(101, "0xA101");
373        detector.check_block(102, "0xA102");
374
375        // Reorg at block 101: tip was 102, fork at 101, depth = 102 - 101 + 1 = 2
376        let event = detector
377            .check_block(101, "0xB101")
378            .expect("should detect reorg");
379
380        assert_eq!(event.fork_block, 101);
381        assert_eq!(event.depth, 2);
382        assert_eq!(event.old_hash, "0xA101");
383        assert_eq!(event.new_hash, "0xB101");
384        assert_eq!(event.current_tip, 101);
385    }
386
387    #[test]
388    fn window_trims_old_blocks() {
389        let config = ReorgConfig {
390            window_size: 5,
391            ..Default::default()
392        };
393        let detector = ReorgDetector::new(config);
394
395        // Add blocks 1 through 10
396        for i in 1..=10 {
397            detector.check_block(i, &format!("0xhash_{i}"));
398        }
399
400        // Window size is 5, tip is 10, cutoff is 10-5=5.
401        // Only blocks > 5 are retained: 6, 7, 8, 9, 10.
402        assert_eq!(detector.window_size(), 5);
403
404        // Blocks 1-5 should be gone (no reorg if we re-add block 3
405        // with a different hash, because it's been trimmed)
406        assert!(detector.check_block(3, "0xdifferent").is_none());
407    }
408
409    #[test]
410    fn callback_fires_on_reorg() {
411        let detector = ReorgDetector::new(ReorgConfig::default());
412
413        let call_count = Arc::new(AtomicU32::new(0));
414        let count_clone = call_count.clone();
415
416        detector.on_reorg(move |_event| {
417            count_clone.fetch_add(1, Ordering::SeqCst);
418        });
419
420        // Add block, then reorg it
421        detector.check_block(100, "0xhash_A");
422        detector.check_block(100, "0xhash_B");
423
424        assert_eq!(call_count.load(Ordering::SeqCst), 1);
425    }
426
427    #[test]
428    fn multiple_callbacks() {
429        let detector = ReorgDetector::new(ReorgConfig::default());
430
431        let count1 = Arc::new(AtomicU32::new(0));
432        let count2 = Arc::new(AtomicU32::new(0));
433        let c1 = count1.clone();
434        let c2 = count2.clone();
435
436        detector.on_reorg(move |_| {
437            c1.fetch_add(1, Ordering::SeqCst);
438        });
439        detector.on_reorg(move |_| {
440            c2.fetch_add(1, Ordering::SeqCst);
441        });
442
443        detector.check_block(100, "0xA");
444        detector.check_block(100, "0xB");
445
446        assert_eq!(count1.load(Ordering::SeqCst), 1);
447        assert_eq!(count2.load(Ordering::SeqCst), 1);
448    }
449
450    #[test]
451    fn reorg_history_recorded() {
452        let detector = ReorgDetector::new(ReorgConfig::default());
453
454        assert!(detector.reorg_history().is_empty());
455
456        // First reorg
457        detector.check_block(100, "0xA");
458        detector.check_block(100, "0xB");
459
460        // Second reorg
461        detector.check_block(200, "0xC");
462        detector.check_block(200, "0xD");
463
464        let history = detector.reorg_history();
465        assert_eq!(history.len(), 2);
466        assert_eq!(history[0].fork_block, 100);
467        assert_eq!(history[1].fork_block, 200);
468    }
469
470    #[test]
471    fn safe_block_calculation() {
472        let config = ReorgConfig {
473            safe_depth: 10,
474            ..Default::default()
475        };
476        let detector = ReorgDetector::new(config);
477
478        // No blocks yet
479        assert!(detector.safe_block().is_none());
480
481        // Tip = 100, safe_depth = 10 -> safe_block = 90
482        detector.check_block(100, "0xhash");
483        assert_eq!(detector.safe_block(), Some(90));
484
485        // Advance tip to 150 -> safe_block = 140
486        detector.check_block(150, "0xhash_150");
487        assert_eq!(detector.safe_block(), Some(140));
488    }
489
490    #[test]
491    fn safe_block_returns_none_when_tip_below_depth() {
492        let config = ReorgConfig {
493            safe_depth: 100,
494            ..Default::default()
495        };
496        let detector = ReorgDetector::new(config);
497
498        // Tip = 50, safe_depth = 100 -> 50 - 100 would underflow
499        detector.check_block(50, "0xhash");
500        assert!(detector.safe_block().is_none());
501    }
502
503    #[test]
504    fn is_block_safe_checks_depth() {
505        let config = ReorgConfig {
506            safe_depth: 10,
507            ..Default::default()
508        };
509        let detector = ReorgDetector::new(config);
510
511        detector.check_block(100, "0xhash");
512        // safe_block = 90
513
514        assert!(detector.is_block_safe(80)); // below 90
515        assert!(detector.is_block_safe(90)); // exactly 90
516        assert!(!detector.is_block_safe(91)); // above 90
517        assert!(!detector.is_block_safe(100)); // at tip
518    }
519
520    #[test]
521    fn is_block_safe_false_without_tip() {
522        let detector = ReorgDetector::new(ReorgConfig::default());
523        assert!(!detector.is_block_safe(0));
524        assert!(!detector.is_block_safe(100));
525    }
526
527    #[test]
528    fn reorg_clears_affected_blocks() {
529        let detector = ReorgDetector::new(ReorgConfig::default());
530
531        // Build chain: 100, 101, 102, 103
532        detector.check_block(100, "0xA100");
533        detector.check_block(101, "0xA101");
534        detector.check_block(102, "0xA102");
535        detector.check_block(103, "0xA103");
536        assert_eq!(detector.window_size(), 4);
537
538        // Reorg at block 101 — blocks 101, 102, 103 should be removed,
539        // then 101 is re-added with the new hash.
540        let event = detector
541            .check_block(101, "0xB101")
542            .expect("should detect reorg");
543        assert_eq!(event.fork_block, 101);
544
545        // Window should contain block 100 (unchanged) and 101 (new hash).
546        // Blocks 102 and 103 were removed.
547        assert_eq!(detector.window_size(), 2);
548
549        // Re-adding 102 with a new hash should NOT trigger reorg
550        // because 102 was removed from the window.
551        assert!(detector.check_block(102, "0xB102").is_none());
552        assert_eq!(detector.window_size(), 3);
553    }
554
555    #[tokio::test]
556    async fn poll_and_check_works() {
557        let transport = MockTransport::new();
558
559        // Set up mock responses
560        transport.set_response(
561            "eth_blockNumber",
562            Value::String("0x64".into()), // block 100
563        );
564        transport.set_response(
565            "eth_getBlockByNumber",
566            serde_json::json!({
567                "number": "0x64",
568                "hash": "0xblock_hash_100"
569            }),
570        );
571
572        let detector = ReorgDetector::new(ReorgConfig::default());
573
574        // First poll — no reorg (fresh window)
575        let result = detector.poll_and_check(&transport).await;
576        assert!(result.is_ok());
577        assert!(result.unwrap().is_none());
578        assert_eq!(detector.window_size(), 1);
579
580        // Poll again with same block/hash — no reorg
581        let result = detector.poll_and_check(&transport).await;
582        assert!(result.is_ok());
583        assert!(result.unwrap().is_none());
584
585        // Change the hash response to simulate reorg
586        transport.set_response(
587            "eth_getBlockByNumber",
588            serde_json::json!({
589                "number": "0x64",
590                "hash": "0xreorged_hash_100"
591            }),
592        );
593
594        // Poll again — should detect reorg
595        let result = detector.poll_and_check(&transport).await;
596        assert!(result.is_ok());
597        let event = result.unwrap().expect("should detect reorg");
598        assert_eq!(event.fork_block, 100);
599        assert_eq!(event.old_hash, "0xblock_hash_100");
600        assert_eq!(event.new_hash, "0xreorged_hash_100");
601    }
602
603    #[tokio::test]
604    async fn fetch_block_hash_works() {
605        let transport = MockTransport::new();
606        transport.set_response(
607            "eth_getBlockByNumber",
608            serde_json::json!({
609                "number": "0xc8",
610                "hash": "0xblock_hash_200"
611            }),
612        );
613
614        let hash = ReorgDetector::fetch_block_hash(&transport, 200).await;
615        assert!(hash.is_ok());
616        assert_eq!(hash.unwrap(), Some("0xblock_hash_200".to_string()));
617    }
618
619    #[tokio::test]
620    async fn fetch_block_hash_returns_none_for_null_hash() {
621        let transport = MockTransport::new();
622        transport.set_response(
623            "eth_getBlockByNumber",
624            serde_json::json!({
625                "number": "0xc8"
626                // no "hash" field
627            }),
628        );
629
630        let hash = ReorgDetector::fetch_block_hash(&transport, 200).await;
631        assert!(hash.is_ok());
632        assert!(hash.unwrap().is_none());
633    }
634
635    #[tokio::test]
636    async fn fetch_finalized_block_works() {
637        let transport = MockTransport::new();
638        transport.set_response(
639            "eth_getBlockByNumber",
640            serde_json::json!({
641                "number": "0x1f4",
642                "hash": "0xfinalized_hash"
643            }),
644        );
645
646        let block = ReorgDetector::fetch_finalized_block(&transport).await;
647        assert!(block.is_ok());
648        assert_eq!(block.unwrap(), 500); // 0x1f4 = 500
649    }
650
651    #[test]
652    fn reorg_event_serializable() {
653        let event = ReorgEvent {
654            fork_block: 100,
655            depth: 3,
656            old_hash: "0xold".into(),
657            new_hash: "0xnew".into(),
658            current_tip: 102,
659        };
660
661        let json = serde_json::to_string(&event).unwrap();
662        assert!(json.contains("fork_block"));
663        assert!(json.contains("100"));
664        assert!(json.contains("0xold"));
665        assert!(json.contains("0xnew"));
666    }
667
668    #[test]
669    fn default_config_values() {
670        let config = ReorgConfig::default();
671        assert_eq!(config.window_size, 128);
672        assert_eq!(config.safe_depth, 64);
673        assert!(config.use_finalized_tag);
674    }
675}