bsv_wallet_toolbox/monitor/tasks/
task_new_header.rs1use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use crate::error::WalletError;
15use crate::monitor::helpers::now_msecs;
16use crate::monitor::ONE_MINUTE;
17use crate::services::traits::WalletServices;
18use crate::services::types::BlockHeader;
19use crate::storage::manager::WalletStorageManager;
20
21use super::super::task_trait::WalletMonitorTask;
22
23pub struct TaskNewHeader {
31 _storage: WalletStorageManager,
33 services: Arc<dyn WalletServices>,
35 trigger_msecs: u64,
37 last_run_msecs: u64,
39 header: Option<BlockHeader>,
41 queued_header: Option<BlockHeader>,
43 queued_header_when: Option<u64>,
45 check_now: Arc<AtomicBool>,
47}
48
49impl TaskNewHeader {
50 pub fn new(
52 storage: WalletStorageManager,
53 services: Arc<dyn WalletServices>,
54 check_now: Arc<AtomicBool>,
55 ) -> Self {
56 Self {
57 _storage: storage,
58 services,
59 trigger_msecs: ONE_MINUTE,
60 last_run_msecs: 0,
61 header: None,
62 queued_header: None,
63 queued_header_when: None,
64 check_now,
65 }
66 }
67
68 pub fn with_trigger_msecs(
70 storage: WalletStorageManager,
71 services: Arc<dyn WalletServices>,
72 check_now: Arc<AtomicBool>,
73 trigger_msecs: u64,
74 ) -> Self {
75 Self {
76 _storage: storage,
77 services,
78 trigger_msecs,
79 last_run_msecs: 0,
80 header: None,
81 queued_header: None,
82 queued_header_when: None,
83 check_now,
84 }
85 }
86}
87
88#[async_trait]
89impl WalletMonitorTask for TaskNewHeader {
90 fn name(&self) -> &str {
91 "NewHeader"
92 }
93
94 fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
95 if now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs {
97 self.last_run_msecs = now_msecs_since_epoch;
98 true
99 } else {
100 false
101 }
102 }
103
104 async fn run_task(&mut self) -> Result<String, WalletError> {
105 let mut log = String::new();
106
107 let current_height = match self.services.get_height().await {
109 Ok(h) => h,
110 Err(e) => {
111 return Ok(format!("error getting chain height: {}", e));
112 }
113 };
114
115 let current_header = BlockHeader {
117 version: 1,
118 previous_hash: String::new(),
119 merkle_root: String::new(),
120 time: 0,
121 bits: 0,
122 nonce: 0,
123 height: current_height,
124 hash: format!("height_{}", current_height),
125 };
126
127 let old_header = self.header.clone();
128 let mut is_new = true;
129
130 match &old_header {
131 None => {
132 log = format!(
133 "first header: {} {}",
134 current_header.height, current_header.hash
135 );
136 self.header = Some(current_header.clone());
137 }
138 Some(old) if old.height > current_header.height => {
139 log = format!("old header: {} vs {}", current_header.height, old.height);
140 is_new = false;
142 }
143 Some(old) if old.height < current_header.height => {
144 let skip = current_header.height - old.height - 1;
145 let skipped = if skip > 0 {
146 format!(" SKIPPED {}", skip)
147 } else {
148 String::new()
149 };
150 log = format!(
151 "new header: {} {}{}",
152 current_header.height, current_header.hash, skipped
153 );
154 self.header = Some(current_header.clone());
155 }
156 Some(old) if old.height == current_header.height && old.hash != current_header.hash => {
157 log = format!(
158 "reorg header: {} {}",
159 current_header.height, current_header.hash
160 );
161 self.header = Some(current_header.clone());
162 }
163 _ => {
164 is_new = false;
166 }
167 }
168
169 if is_new {
170 self.queued_header = self.header.clone();
171 self.queued_header_when = Some(now_msecs());
172 } else if let Some(ref _queued) = self.queued_header.clone() {
173 let delay = if let Some(when) = self.queued_header_when {
175 (now_msecs() - when) as f64 / 1000.0
176 } else {
177 0.0
178 };
179 if let Some(ref h) = self.header {
180 log = format!(
181 "process header: {} {} delayed {:.1} secs",
182 h.height, h.hash, delay
183 );
184 }
185 self.check_now.store(true, Ordering::SeqCst);
187 self.queued_header = None;
188 }
189
190 Ok(log)
191 }
192}
193
194#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::monitor::ONE_MINUTE;
202
203 #[test]
204 fn test_default_trigger_interval() {
205 assert_eq!(ONE_MINUTE, 60_000);
206 }
207
208 #[test]
209 fn test_task_name() {
210 assert_eq!("NewHeader", "NewHeader");
211 }
212
213 #[test]
214 fn test_check_now_flag_interaction() {
215 let check_now = Arc::new(AtomicBool::new(false));
216 assert!(!check_now.load(Ordering::SeqCst));
217 check_now.store(true, Ordering::SeqCst);
218 assert!(check_now.load(Ordering::SeqCst));
219 }
220}