Skip to main content

csv_adapter_core/
monitor.rs

1//! Reorg monitoring and censorship detection
2//!
3//! Cross-cutting components that track chain state across all adapters:
4//! - Reorg detection and anchor invalidation
5//! - Publication timeout tracking (censorship detection)
6
7use alloc::collections::BTreeMap;
8use alloc::string::String;
9use alloc::vec::Vec;
10
11use crate::hash::Hash;
12use crate::store::{SealStore, StoreError};
13
14/// Reorg event detected on a chain
15#[derive(Clone, Debug)]
16pub struct ReorgEvent {
17    /// Chain identifier
18    pub chain: String,
19    /// Fork point (last common block height)
20    pub fork_height: u64,
21    /// Depth of the reorg (blocks invalidated)
22    pub depth: u64,
23    /// Old chain tip hash (now orphaned)
24    pub old_tip: [u8; 32],
25    /// New chain tip hash (current best chain)
26    pub new_tip: [u8; 32],
27}
28
29/// Chain tip tracker for detecting blockchain reorganizations
30pub struct ReorgMonitor {
31    /// Known chain tips (chain → (height, hash))
32    tips: BTreeMap<String, (u64, [u8; 32])>,
33    /// Recent reorg events
34    reorgs: Vec<ReorgEvent>,
35}
36
37impl ReorgMonitor {
38    /// Create a new reorg monitor with empty state
39    pub fn new() -> Self {
40        Self {
41            tips: BTreeMap::new(),
42            reorgs: Vec::new(),
43        }
44    }
45
46    /// Update the chain tip. Returns `Some(ReorgEvent)` if a reorg is detected.
47    pub fn update_tip(
48        &mut self,
49        chain: &str,
50        height: u64,
51        tip_hash: [u8; 32],
52    ) -> Option<ReorgEvent> {
53        if let Some(&(prev_height, prev_hash)) = self.tips.get(chain) {
54            if height <= prev_height {
55                // New tip is at or below previous height → reorg
56                let depth = prev_height.saturating_sub(height) + 1;
57                let event = ReorgEvent {
58                    chain: chain.to_string(),
59                    fork_height: height,
60                    depth,
61                    old_tip: prev_hash,
62                    new_tip: tip_hash,
63                };
64                self.tips.insert(chain.to_string(), (height, tip_hash));
65                self.reorgs.push(event.clone());
66                return Some(event);
67            }
68            // Check for gap (reorg + reorg)
69            if height > prev_height + 1 {
70                // Chain jumped forward (normal case, not a reorg)
71            }
72        }
73        self.tips.insert(chain.to_string(), (height, tip_hash));
74        None
75    }
76
77    /// Roll back anchors and seals after a reorg
78    pub fn handle_reorg(
79        &self,
80        event: &ReorgEvent,
81        store: &mut dyn SealStore,
82    ) -> Result<usize, StoreError> {
83        let mut removed = 0;
84        removed += store.remove_anchors_after(&event.chain, event.fork_height)?;
85        removed += store.remove_seals_after(&event.chain, event.fork_height)?;
86        Ok(removed)
87    }
88
89    /// Get recent reorgs for a specific chain
90    pub fn recent_reorgs(&self, chain: &str) -> Vec<&ReorgEvent> {
91        self.reorgs.iter().filter(|r| r.chain == chain).collect()
92    }
93
94    /// Get the current tip for a chain
95    pub fn tip(&self, chain: &str) -> Option<(u64, [u8; 32])> {
96        self.tips.get(chain).copied()
97    }
98}
99
100impl Default for ReorgMonitor {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106/// Publication tracker for detecting censorship or stuck transactions
107///
108/// Tracks pending publications and flags them if not included
109/// within the configured timeout period.
110pub struct PublicationTracker {
111    /// Pending publications (chain → Vec)
112    pending: BTreeMap<String, Vec<PendingPublication>>,
113    /// Publication timeout in seconds
114    pub timeout_seconds: u64,
115}
116
117/// Represents a transaction awaiting on-chain inclusion
118#[derive(Clone, Debug)]
119pub struct PendingPublication {
120    /// Transaction hash
121    pub tx_hash: Vec<u8>,
122    /// Commitment hash being published
123    pub commitment_hash: Hash,
124    /// Unix epoch seconds when submission occurred
125    pub submitted_at: u64,
126}
127
128impl PublicationTracker {
129    /// Create a new tracker with the given timeout
130    pub fn new(timeout_seconds: u64) -> Self {
131        Self {
132            pending: BTreeMap::new(),
133            timeout_seconds,
134        }
135    }
136
137    /// Record a new publication submission
138    pub fn track_publication(
139        &mut self,
140        chain: &str,
141        tx_hash: Vec<u8>,
142        commitment_hash: Hash,
143        submitted_at: u64,
144    ) {
145        self.pending
146            .entry(chain.to_string())
147            .or_default()
148            .push(PendingPublication {
149                tx_hash,
150                commitment_hash,
151                submitted_at,
152            });
153    }
154
155    /// Confirm a publication was included
156    pub fn confirm_publication(&mut self, chain: &str, tx_hash: &[u8]) -> bool {
157        if let Some(pending) = self.pending.get_mut(chain) {
158            let before = pending.len();
159            pending.retain(|p| p.tx_hash != tx_hash);
160            return pending.len() < before;
161        }
162        false
163    }
164
165    /// Get timed-out publications (censorship candidates)
166    pub fn timed_out(&self, chain: &str, current_time: u64) -> Vec<&PendingPublication> {
167        self.pending
168            .get(chain)
169            .map(|p| {
170                p.iter()
171                    .filter(|pp| {
172                        current_time.saturating_sub(pp.submitted_at) > self.timeout_seconds
173                    })
174                    .collect()
175            })
176            .unwrap_or_default()
177    }
178
179    /// Get count of pending publications
180    pub fn pending_count(&self, chain: &str) -> usize {
181        self.pending.get(chain).map(|p| p.len()).unwrap_or(0)
182    }
183
184    /// Clear all pending publications for a chain
185    pub fn clear_chain(&mut self, chain: &str) {
186        self.pending.remove(chain);
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use crate::store::InMemorySealStore;
194
195    #[test]
196    fn test_reorg_monitor_normal_update() {
197        let mut monitor = ReorgMonitor::new();
198        assert!(monitor.update_tip("bitcoin", 100, [1u8; 32]).is_none());
199        assert!(monitor.update_tip("bitcoin", 101, [2u8; 32]).is_none());
200    }
201
202    #[test]
203    fn test_reorg_monitor_detect_reorg() {
204        let mut monitor = ReorgMonitor::new();
205        monitor.update_tip("bitcoin", 100, [1u8; 32]);
206        monitor.update_tip("bitcoin", 101, [2u8; 32]);
207        // New tip is lower → reorg
208        let event = monitor.update_tip("bitcoin", 99, [3u8; 32]);
209        assert!(event.is_some());
210        let event = event.unwrap();
211        assert_eq!(event.chain, "bitcoin");
212        assert_eq!(event.fork_height, 99);
213        assert_eq!(event.old_tip, [2u8; 32]);
214        assert_eq!(event.new_tip, [3u8; 32]);
215    }
216
217    #[test]
218    fn test_reorg_monitor_handle_reorg() {
219        let mut monitor = ReorgMonitor::new();
220        monitor.update_tip("bitcoin", 100, [1u8; 32]);
221
222        let mut store = InMemorySealStore::new();
223        store
224            .save_seal(&crate::store::SealRecord {
225                chain: "bitcoin".to_string(),
226                seal_id: vec![1],
227                consumed_at_height: 101,
228                commitment_hash: Hash::new([0xAA; 32]),
229                recorded_at: 1700000000,
230            })
231            .unwrap();
232
233        let event = ReorgEvent {
234            chain: "bitcoin".to_string(),
235            fork_height: 100,
236            depth: 1,
237            old_tip: [1u8; 32],
238            new_tip: [2u8; 32],
239        };
240
241        let removed = monitor.handle_reorg(&event, &mut store).unwrap();
242        assert_eq!(removed, 1);
243    }
244
245    #[test]
246    fn test_publication_tracker_lifecycle() {
247        let mut tracker = PublicationTracker::new(3600);
248        tracker.track_publication("bitcoin", vec![1, 2, 3], Hash::new([0xAA; 32]), 1700000000);
249        assert_eq!(tracker.pending_count("bitcoin"), 1);
250
251        // Confirm
252        assert!(tracker.confirm_publication("bitcoin", &[1, 2, 3]));
253        assert_eq!(tracker.pending_count("bitcoin"), 0);
254    }
255
256    #[test]
257    fn test_publication_tracker_timeout() {
258        let mut tracker = PublicationTracker::new(3600);
259        tracker.track_publication("bitcoin", vec![1, 2, 3], Hash::new([0xAA; 32]), 1700000000);
260
261        // Within timeout
262        assert!(tracker.timed_out("bitcoin", 1700003000).is_empty());
263
264        // Past timeout
265        let timed_out = tracker.timed_out("bitcoin", 1700004000);
266        assert_eq!(timed_out.len(), 1);
267    }
268
269    #[test]
270    fn test_publication_tracker_multiple() {
271        let mut tracker = PublicationTracker::new(3600);
272        tracker.track_publication("bitcoin", vec![1], Hash::new([1u8; 32]), 1700000000);
273        tracker.track_publication("bitcoin", vec![2], Hash::new([2u8; 32]), 1700000000);
274        tracker.track_publication("ethereum", vec![3], Hash::new([3u8; 32]), 1700000000);
275
276        assert_eq!(tracker.pending_count("bitcoin"), 2);
277        assert_eq!(tracker.pending_count("ethereum"), 1);
278
279        tracker.confirm_publication("bitcoin", &[1]);
280        assert_eq!(tracker.pending_count("bitcoin"), 1);
281
282        tracker.clear_chain("bitcoin");
283        assert_eq!(tracker.pending_count("bitcoin"), 0);
284    }
285}