Skip to main content

bsv_wallet_toolbox/monitor/tasks/
task_new_header.rs

1//! TaskNewHeader -- polls for new block headers from chain tracker.
2//!
3//! Translated from wallet-toolbox/src/monitor/tasks/TaskNewHeader.ts (93 lines).
4//!
5//! Polls the chain tip height periodically. When a new block is detected,
6//! queues the header for one cycle. If the header remains the tip after
7//! a full cycle, triggers proof checking via the shared check_now flag.
8
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use crate::error::WalletError;
15use crate::monitor::helpers::now_msecs;
16use crate::monitor::ONE_MINUTE;
17use crate::services::traits::WalletServices;
18use crate::services::types::BlockHeader;
19use crate::storage::manager::WalletStorageManager;
20
21use super::super::task_trait::WalletMonitorTask;
22
23/// Background task that polls for new block headers.
24///
25/// When a new block is detected, it queues the header and waits one cycle.
26/// If the header remains the chain tip after a full cycle (no further new blocks),
27/// it triggers proof checking by setting the shared check_now flag.
28///
29/// This aging pattern avoids chasing proofs during rapid block reorgs.
30pub struct TaskNewHeader {
31    /// Storage manager for persistence operations (reserved for future header persistence).
32    _storage: WalletStorageManager,
33    /// Network services for chain tip queries.
34    services: Arc<dyn WalletServices>,
35    /// How often to trigger (default 1 minute).
36    trigger_msecs: u64,
37    /// Last time this task ran (epoch ms).
38    last_run_msecs: u64,
39    /// The most recent chain tip header known to this task.
40    header: Option<BlockHeader>,
41    /// A new header queued for aging (set to None after processing).
42    queued_header: Option<BlockHeader>,
43    /// When the queued header was first seen (epoch ms).
44    queued_header_when: Option<u64>,
45    /// Shared flag with TaskCheckForProofs: set to true to nudge proof checking.
46    check_now: Arc<AtomicBool>,
47}
48
49impl TaskNewHeader {
50    /// Create a new TaskNewHeader with default intervals.
51    pub fn new(
52        storage: WalletStorageManager,
53        services: Arc<dyn WalletServices>,
54        check_now: Arc<AtomicBool>,
55    ) -> Self {
56        Self {
57            _storage: storage,
58            services,
59            trigger_msecs: ONE_MINUTE,
60            last_run_msecs: 0,
61            header: None,
62            queued_header: None,
63            queued_header_when: None,
64            check_now,
65        }
66    }
67
68    /// Create with a custom trigger interval.
69    pub fn with_trigger_msecs(
70        storage: WalletStorageManager,
71        services: Arc<dyn WalletServices>,
72        check_now: Arc<AtomicBool>,
73        trigger_msecs: u64,
74    ) -> Self {
75        Self {
76            _storage: storage,
77            services,
78            trigger_msecs,
79            last_run_msecs: 0,
80            header: None,
81            queued_header: None,
82            queued_header_when: None,
83            check_now,
84        }
85    }
86}
87
88#[async_trait]
89impl WalletMonitorTask for TaskNewHeader {
90    fn name(&self) -> &str {
91        "NewHeader"
92    }
93
94    fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
95        // Always run on each cycle (matching TS where trigger returns { run: true })
96        if now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs {
97            self.last_run_msecs = now_msecs_since_epoch;
98            true
99        } else {
100            false
101        }
102    }
103
104    async fn run_task(&mut self) -> Result<String, WalletError> {
105        let mut log = String::new();
106
107        // Get current chain tip height
108        let current_height = match self.services.get_height().await {
109            Ok(h) => h,
110            Err(e) => {
111                return Ok(format!("error getting chain height: {}", e));
112            }
113        };
114
115        // Build a simple header from height (full header data comes from chain tracker)
116        let current_header = BlockHeader {
117            version: 1,
118            previous_hash: String::new(),
119            merkle_root: String::new(),
120            time: 0,
121            bits: 0,
122            nonce: 0,
123            height: current_height,
124            hash: format!("height_{}", current_height),
125        };
126
127        let old_header = self.header.clone();
128        let mut is_new = true;
129
130        match &old_header {
131            None => {
132                log = format!(
133                    "first header: {} {}",
134                    current_header.height, current_header.hash
135                );
136                self.header = Some(current_header.clone());
137            }
138            Some(old) if old.height > current_header.height => {
139                log = format!("old header: {} vs {}", current_header.height, old.height);
140                // Revert to old header with the higher height
141                is_new = false;
142            }
143            Some(old) if old.height < current_header.height => {
144                let skip = current_header.height - old.height - 1;
145                let skipped = if skip > 0 {
146                    format!(" SKIPPED {}", skip)
147                } else {
148                    String::new()
149                };
150                log = format!(
151                    "new header: {} {}{}",
152                    current_header.height, current_header.hash, skipped
153                );
154                self.header = Some(current_header.clone());
155            }
156            Some(old) if old.height == current_header.height && old.hash != current_header.hash => {
157                log = format!(
158                    "reorg header: {} {}",
159                    current_header.height, current_header.hash
160                );
161                self.header = Some(current_header.clone());
162            }
163            _ => {
164                // Same height, same hash -- no change
165                is_new = false;
166            }
167        }
168
169        if is_new {
170            self.queued_header = self.header.clone();
171            self.queued_header_when = Some(now_msecs());
172        } else if let Some(ref _queued) = self.queued_header.clone() {
173            // Only process new block header if it has remained the chain tip for a full cycle
174            let delay = if let Some(when) = self.queued_header_when {
175                (now_msecs() - when) as f64 / 1000.0
176            } else {
177                0.0
178            };
179            if let Some(ref h) = self.header {
180                log = format!(
181                    "process header: {} {} delayed {:.1} secs",
182                    h.height, h.hash, delay
183                );
184            }
185            // Nudge proof checking
186            self.check_now.store(true, Ordering::SeqCst);
187            self.queued_header = None;
188        }
189
190        Ok(log)
191    }
192}
193
194// ---------------------------------------------------------------------------
195// Tests
196// ---------------------------------------------------------------------------
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::monitor::ONE_MINUTE;
202
203    #[test]
204    fn test_default_trigger_interval() {
205        assert_eq!(ONE_MINUTE, 60_000);
206    }
207
208    #[test]
209    fn test_task_name() {
210        assert_eq!("NewHeader", "NewHeader");
211    }
212
213    #[test]
214    fn test_check_now_flag_interaction() {
215        let check_now = Arc::new(AtomicBool::new(false));
216        assert!(!check_now.load(Ordering::SeqCst));
217        check_now.store(true, Ordering::SeqCst);
218        assert!(check_now.load(Ordering::SeqCst));
219    }
220}