1pub mod helpers;
10pub mod task_trait;
12pub mod tasks;
14
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
18use std::sync::Arc;
19
20use tracing::{error, info, warn};
21
22use crate::error::{WalletError, WalletResult};
23use crate::services::traits::WalletServices;
24use crate::services::types::BlockHeader;
25use crate::storage::find_args::PurgeParams;
26use crate::storage::manager::WalletStorageManager;
27use crate::types::Chain;
28
29use self::helpers::{log_event, now_msecs};
30use self::task_trait::WalletMonitorTask;
31
32pub const ONE_SECOND: u64 = 1000;
38pub const ONE_MINUTE: u64 = 60 * ONE_SECOND;
40pub const ONE_HOUR: u64 = 60 * ONE_MINUTE;
42pub const ONE_DAY: u64 = 24 * ONE_HOUR;
44pub const ONE_WEEK: u64 = 7 * ONE_DAY;
46
47pub type AsyncCallback<T> =
53 Arc<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
54
55pub struct MonitorOptions {
63 pub chain: Chain,
65
66 pub task_run_wait_msecs: u64,
68
69 pub abandoned_msecs: u64,
71
72 pub msecs_wait_per_merkle_proof_service_req: u64,
74
75 pub unproven_attempts_limit_test: u32,
77
78 pub unproven_attempts_limit_main: u32,
80
81 pub callback_token: Option<String>,
83
84 pub on_tx_broadcasted: Option<AsyncCallback<String>>,
86
87 pub on_tx_proven: Option<AsyncCallback<String>>,
89
90 pub on_tx_status_changed: Option<AsyncCallback<(String, String)>>,
92}
93
94impl Default for MonitorOptions {
95 fn default() -> Self {
96 Self {
97 chain: Chain::Main,
98 task_run_wait_msecs: 5000,
99 abandoned_msecs: ONE_MINUTE * 5,
100 msecs_wait_per_merkle_proof_service_req: 500,
101 unproven_attempts_limit_test: 10,
102 unproven_attempts_limit_main: 144,
103 callback_token: None,
104 on_tx_broadcasted: None,
105 on_tx_proven: None,
106 on_tx_status_changed: None,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
117pub struct DeactivatedHeader {
118 pub when_msecs: u64,
121 pub tries: u32,
125 pub header: BlockHeader,
127}
128
129pub struct Monitor {
143 pub options: MonitorOptions,
145
146 pub storage: WalletStorageManager,
148
149 pub services: Arc<dyn WalletServices>,
151
152 pub chain: Chain,
154
155 tasks: Vec<Box<dyn WalletMonitorTask>>,
157
158 other_tasks: Vec<Box<dyn WalletMonitorTask>>,
160
161 running: Arc<AtomicBool>,
163
164 pub check_now: Arc<AtomicBool>,
167
168 pub last_new_header_height: Arc<AtomicU32>,
172
173 pub last_new_header: Option<BlockHeader>,
175
176 pub last_new_header_when: Option<u64>,
178
179 pub deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
181
182 join_handle: Option<tokio::task::JoinHandle<()>>,
184
185 run_async_setup: bool,
187}
188
189pub fn default_purge_params() -> PurgeParams {
191 PurgeParams {
192 purge_spent: false,
193 purge_completed: false,
194 purge_failed: true,
195 purge_spent_age: 2 * ONE_WEEK,
196 purge_completed_age: 2 * ONE_WEEK,
197 purge_failed_age: 5 * ONE_DAY,
198 }
199}
200
201impl Monitor {
202 pub fn builder() -> MonitorBuilder {
204 MonitorBuilder::new()
205 }
206
207 pub fn start_tasks(&mut self) -> WalletResult<()> {
211 if self.running.load(Ordering::SeqCst) {
212 return Err(WalletError::BadRequest(
213 "monitor tasks are already running".to_string(),
214 ));
215 }
216
217 self.running.store(true, Ordering::SeqCst);
218 self.run_async_setup = true;
219
220 let running = self.running.clone();
223 let _check_now = self.check_now.clone();
224 let _deactivated_headers = self.deactivated_headers.clone();
225 let task_run_wait_msecs = self.options.task_run_wait_msecs;
226
227 let mut tasks: Vec<Box<dyn WalletMonitorTask>> = std::mem::take(&mut self.tasks);
230 let storage = WalletStorageManager::new(
231 self.storage.auth_id().to_string(),
232 self.storage.active().cloned(),
233 self.storage.backups().to_vec(),
234 );
235
236 let handle = tokio::spawn(async move {
237 for task in tasks.iter_mut() {
239 if !running.load(Ordering::SeqCst) {
240 break;
241 }
242 if let Err(e) = task.async_setup().await {
243 let details = format!("monitor task {} asyncSetup error: {}", task.name(), e);
244 warn!("{}", details);
245 let _ = log_event(&storage, "error0", &details).await;
246 }
247 }
248
249 while running.load(Ordering::SeqCst) {
251 let now = now_msecs();
252
253 let mut triggered_indices = Vec::new();
255 for (i, task) in tasks.iter_mut().enumerate() {
256 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
257 task.trigger(now)
258 })) {
259 Ok(should_run) => {
260 if should_run {
261 triggered_indices.push(i);
262 }
263 }
264 Err(_) => {
265 let details = format!("monitor task {} trigger panicked", task.name());
266 error!("{}", details);
267 let _ = log_event(&storage, "error0", &details).await;
268 }
269 }
270 }
271
272 for idx in triggered_indices {
274 if !running.load(Ordering::SeqCst) {
275 break;
276 }
277 let task = &mut tasks[idx];
278 match task.run_task().await {
279 Ok(log) => {
280 if !log.is_empty() {
281 info!("Task {} {}", task.name(), &log[..log.len().min(1024)]);
282 let _ = log_event(&storage, task.name(), &log).await;
283 }
284 }
285 Err(e) => {
286 let details =
287 format!("monitor task {} runTask error: {}", task.name(), e);
288 error!("{}", details);
289 let _ = log_event(&storage, "error1", &details).await;
290 }
291 }
292 }
293
294 tokio::time::sleep(tokio::time::Duration::from_millis(task_run_wait_msecs)).await;
296 }
297
298 info!("Monitor polling loop stopped");
299 });
300
301 self.join_handle = Some(handle);
302 Ok(())
303 }
304
305 pub async fn stop_tasks(&mut self) {
307 self.running.store(false, Ordering::SeqCst);
308 if let Some(handle) = self.join_handle.take() {
309 let _ = handle.await;
310 }
311 }
312
313 pub async fn destroy(&mut self) {
315 self.stop_tasks().await;
316 }
318
319 pub async fn run_once(&mut self) -> WalletResult<()> {
323 if self.run_async_setup {
324 for task in self.tasks.iter_mut() {
325 if let Err(e) = task.async_setup().await {
326 let details = format!("monitor task {} asyncSetup error: {}", task.name(), e);
327 warn!("{}", details);
328 let _ = log_event(&self.storage, "error0", &details).await;
329 }
330 }
331 self.run_async_setup = false;
332 }
333
334 let now = now_msecs();
335
336 let mut triggered_indices = Vec::new();
338 for (i, task) in self.tasks.iter_mut().enumerate() {
339 if task.trigger(now) {
340 triggered_indices.push(i);
341 }
342 }
343
344 for idx in triggered_indices {
346 let task = &mut self.tasks[idx];
347 match task.run_task().await {
348 Ok(log) => {
349 if !log.is_empty() {
350 info!("Task {} {}", task.name(), &log[..log.len().min(1024)]);
351 let _ = log_event(&self.storage, task.name(), &log).await;
352 }
353 }
354 Err(e) => {
355 let details = format!("monitor task {} runTask error: {}", task.name(), e);
356 error!("{}", details);
357 let _ = log_event(&self.storage, "error1", &details).await;
358 }
359 }
360 }
361
362 Ok(())
363 }
364
365 pub async fn run_task(&mut self, name: &str) -> WalletResult<String> {
367 for task in self.tasks.iter_mut() {
369 if task.name() == name {
370 task.async_setup().await?;
371 return task.run_task().await;
372 }
373 }
374 for task in self.other_tasks.iter_mut() {
376 if task.name() == name {
377 task.async_setup().await?;
378 return task.run_task().await;
379 }
380 }
381 Err(WalletError::InvalidParameter {
382 parameter: "name".to_string(),
383 must_be: format!("an existing task name, '{}' not found", name),
384 })
385 }
386
387 pub fn process_new_block_header(&mut self, header: BlockHeader) {
391 self.last_new_header_height
393 .store(header.height, Ordering::SeqCst);
394 self.last_new_header = Some(header);
395 self.last_new_header_when = Some(now_msecs());
396 self.check_now.store(true, Ordering::SeqCst);
398 }
399
400 pub async fn process_reorg(
405 &self,
406 _depth: u32,
407 _old_tip: &BlockHeader,
408 _new_tip: &BlockHeader,
409 deactivated: Option<&[BlockHeader]>,
410 ) {
411 if let Some(headers) = deactivated {
412 let mut queue = self.deactivated_headers.lock().await;
413 for header in headers {
414 queue.push(DeactivatedHeader {
415 when_msecs: now_msecs(),
416 tries: 0,
417 header: header.clone(),
418 });
419 }
420 }
421 }
422
423 pub fn process_header(&self, _header: &BlockHeader) {
428 }
430
431 pub async fn call_on_broadcasted_transaction(&self, broadcast_result: &str) {
437 if let Some(ref cb) = self.options.on_tx_broadcasted {
438 cb(broadcast_result.to_string()).await;
439 }
440 }
441
442 pub async fn call_on_proven_transaction(&self, tx_status: &str) {
444 if let Some(ref cb) = self.options.on_tx_proven {
445 cb(tx_status.to_string()).await;
446 }
447 }
448
449 pub async fn call_on_transaction_status_changed(&self, txid: &str, new_status: &str) {
451 if let Some(ref cb) = self.options.on_tx_status_changed {
452 cb((txid.to_string(), new_status.to_string())).await;
453 }
454 }
455
456 pub fn is_running(&self) -> bool {
458 self.running.load(Ordering::SeqCst)
459 }
460
461 pub fn add_task(&mut self, task: Box<dyn WalletMonitorTask>) -> WalletResult<()> {
463 let name = task.name().to_string();
464 if self.tasks.iter().any(|t| t.name() == name) {
465 return Err(WalletError::BadRequest(format!(
466 "task {} has already been added",
467 name
468 )));
469 }
470 self.tasks.push(task);
471 Ok(())
472 }
473
474 pub fn remove_task(&mut self, name: &str) {
476 self.tasks.retain(|t| t.name() != name);
477 }
478}
479
480pub struct MonitorBuilder {
510 chain: Option<Chain>,
511 storage: Option<WalletStorageManager>,
512 services: Option<Arc<dyn WalletServices>>,
513 options: MonitorOptions,
514 default_tasks: bool,
515 multi_user_tasks: bool,
516 extra_tasks: Vec<Box<dyn WalletMonitorTask>>,
517 removed_task_names: Vec<String>,
518}
519
520impl MonitorBuilder {
521 fn new() -> Self {
522 Self {
523 chain: None,
524 storage: None,
525 services: None,
526 options: MonitorOptions::default(),
527 default_tasks: false,
528 multi_user_tasks: false,
529 extra_tasks: Vec::new(),
530 removed_task_names: Vec::new(),
531 }
532 }
533
534 pub fn chain(mut self, chain: Chain) -> Self {
536 self.chain = Some(chain);
537 self
538 }
539
540 pub fn storage(mut self, storage: WalletStorageManager) -> Self {
542 self.storage = Some(storage);
543 self
544 }
545
546 pub fn services(mut self, services: Arc<dyn WalletServices>) -> Self {
548 self.services = Some(services);
549 self
550 }
551
552 pub fn task_run_wait_msecs(mut self, msecs: u64) -> Self {
554 self.options.task_run_wait_msecs = msecs;
555 self
556 }
557
558 pub fn abandoned_msecs(mut self, msecs: u64) -> Self {
560 self.options.abandoned_msecs = msecs;
561 self
562 }
563
564 pub fn callback_token(mut self, token: String) -> Self {
566 self.options.callback_token = Some(token);
567 self
568 }
569
570 pub fn default_tasks(mut self) -> Self {
577 self.default_tasks = true;
578 self
579 }
580
581 pub fn multi_user_tasks(mut self) -> Self {
587 self.multi_user_tasks = true;
588 self
589 }
590
591 pub fn add_task(mut self, task: Box<dyn WalletMonitorTask>) -> Self {
593 self.extra_tasks.push(task);
594 self
595 }
596
597 pub fn remove_task(mut self, name: &str) -> Self {
599 self.removed_task_names.push(name.to_string());
600 self
601 }
602
603 pub fn on_tx_broadcasted(mut self, cb: AsyncCallback<String>) -> Self {
605 self.options.on_tx_broadcasted = Some(cb);
606 self
607 }
608
609 pub fn on_tx_proven(mut self, cb: AsyncCallback<String>) -> Self {
611 self.options.on_tx_proven = Some(cb);
612 self
613 }
614
615 pub fn on_tx_status_changed(mut self, cb: AsyncCallback<(String, String)>) -> Self {
617 self.options.on_tx_status_changed = Some(cb);
618 self
619 }
620
621 pub fn build(mut self) -> WalletResult<Monitor> {
626 let chain = self
627 .chain
628 .ok_or_else(|| WalletError::MissingParameter("chain".to_string()))?;
629 let storage = self
630 .storage
631 .ok_or_else(|| WalletError::MissingParameter("storage".to_string()))?;
632 let services = self
633 .services
634 .ok_or_else(|| WalletError::MissingParameter("services".to_string()))?;
635
636 self.options.chain = chain.clone();
637
638 let check_now = Arc::new(AtomicBool::new(false));
640 let last_new_header_height = Arc::new(AtomicU32::new(u32::MAX));
641 let deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
642 Arc::new(tokio::sync::Mutex::new(Vec::new()));
643
644 let make_storage = |s: &WalletStorageManager| -> WalletStorageManager {
646 WalletStorageManager::new(
647 s.auth_id().to_string(),
648 s.active().cloned(),
649 s.backups().to_vec(),
650 )
651 };
652
653 let unproven_limit = match chain {
655 Chain::Test => self.options.unproven_attempts_limit_test,
656 _ => self.options.unproven_attempts_limit_main,
657 };
658
659 let mut tasks: Vec<Box<dyn WalletMonitorTask>> = Vec::new();
661
662 if self.default_tasks || self.multi_user_tasks {
663 tasks.push(Box::new(tasks::task_clock::TaskClock::new()));
665 tasks.push(Box::new(
666 tasks::task_monitor_call_history::TaskMonitorCallHistory::new(services.clone()),
667 ));
668
669 tasks.push(Box::new(tasks::task_new_header::TaskNewHeader::new(
671 make_storage(&storage),
672 services.clone(),
673 check_now.clone(),
674 )));
675 tasks.push(Box::new(tasks::task_send_waiting::TaskSendWaiting::new(
676 make_storage(&storage),
677 services.clone(),
678 chain.clone(),
679 self.options.on_tx_broadcasted.clone(),
680 )));
681 tasks.push(Box::new(
682 tasks::task_check_for_proofs::TaskCheckForProofs::new(
683 make_storage(&storage),
684 services.clone(),
685 chain.clone(),
686 check_now.clone(),
687 unproven_limit,
688 self.options.on_tx_proven.clone(),
689 last_new_header_height.clone(),
690 ),
691 ));
692 tasks.push(Box::new(tasks::task_check_no_sends::TaskCheckNoSends::new(
693 make_storage(&storage),
694 services.clone(),
695 chain.clone(),
696 unproven_limit,
697 last_new_header_height.clone(),
698 )));
699 tasks.push(Box::new(
700 tasks::task_fail_abandoned::TaskFailAbandoned::new(
701 make_storage(&storage),
702 self.options.abandoned_msecs,
703 ),
704 ));
705 tasks.push(Box::new(tasks::task_unfail::TaskUnFail::new(
706 make_storage(&storage),
707 services.clone(),
708 )));
709 tasks.push(Box::new(tasks::task_review_status::TaskReviewStatus::new(
710 make_storage(&storage),
711 )));
712 tasks.push(Box::new(tasks::task_reorg::TaskReorg::new(
713 make_storage(&storage),
714 services.clone(),
715 deactivated_headers.clone(),
716 )));
717
718 if self.default_tasks {
720 tasks.push(Box::new(tasks::task_arc_sse::TaskArcSse::new(
721 make_storage(&storage),
722 services.clone(),
723 self.options.callback_token.clone(),
724 self.options.on_tx_status_changed.clone(),
725 )));
726 tasks.push(Box::new(tasks::task_sync_when_idle::TaskSyncWhenIdle::new()));
727 }
728
729 tasks.push(Box::new(tasks::task_purge::TaskPurge::new(
731 make_storage(&storage),
732 default_purge_params(),
733 )));
734 }
735
736 for name in &self.removed_task_names {
738 tasks.retain(|t| t.name() != name.as_str());
739 }
740
741 tasks.append(&mut self.extra_tasks);
743
744 Ok(Monitor {
745 options: self.options,
746 storage,
747 services,
748 chain,
749 tasks,
750 other_tasks: Vec::new(),
751 running: Arc::new(AtomicBool::new(false)),
752 check_now,
753 last_new_header_height,
754 last_new_header: None,
755 last_new_header_when: None,
756 deactivated_headers,
757 join_handle: None,
758 run_async_setup: true,
759 })
760 }
761}
762
763#[cfg(test)]
768mod tests {
769 use super::*;
770 use crate::services::types::BlockHeader;
771
772 struct MockServices {
774 chain: Chain,
775 }
776
777 #[async_trait::async_trait]
778 impl WalletServices for MockServices {
779 fn chain(&self) -> Chain {
780 self.chain.clone()
781 }
782 async fn get_chain_tracker(
783 &self,
784 ) -> WalletResult<Box<dyn bsv::transaction::chain_tracker::ChainTracker>> {
785 Err(WalletError::NotImplemented("mock".into()))
786 }
787 async fn get_merkle_path(
788 &self,
789 _txid: &str,
790 _use_next: bool,
791 ) -> crate::services::types::GetMerklePathResult {
792 crate::services::types::GetMerklePathResult::default()
793 }
794 async fn get_raw_tx(
795 &self,
796 _txid: &str,
797 _use_next: bool,
798 ) -> crate::services::types::GetRawTxResult {
799 crate::services::types::GetRawTxResult::default()
800 }
801 async fn post_beef(
802 &self,
803 _beef: &[u8],
804 _txids: &[String],
805 ) -> Vec<crate::services::types::PostBeefResult> {
806 vec![]
807 }
808 async fn get_utxo_status(
809 &self,
810 _output: &str,
811 _output_format: Option<crate::services::types::GetUtxoStatusOutputFormat>,
812 _outpoint: Option<&str>,
813 _use_next: bool,
814 ) -> crate::services::types::GetUtxoStatusResult {
815 crate::services::types::GetUtxoStatusResult {
816 name: "mock".to_string(),
817 status: "error".to_string(),
818 error: Some("mock".to_string()),
819 is_utxo: None,
820 details: vec![],
821 }
822 }
823 async fn get_status_for_txids(
824 &self,
825 _txids: &[String],
826 _use_next: bool,
827 ) -> crate::services::types::GetStatusForTxidsResult {
828 crate::services::types::GetStatusForTxidsResult {
829 name: "mock".to_string(),
830 status: "error".to_string(),
831 error: Some("mock".to_string()),
832 results: vec![],
833 }
834 }
835 async fn get_script_hash_history(
836 &self,
837 _hash: &str,
838 _use_next: bool,
839 ) -> crate::services::types::GetScriptHashHistoryResult {
840 crate::services::types::GetScriptHashHistoryResult {
841 name: "mock".to_string(),
842 status: "error".to_string(),
843 error: Some("mock".to_string()),
844 history: vec![],
845 }
846 }
847 async fn hash_to_header(&self, _hash: &str) -> WalletResult<BlockHeader> {
848 Err(WalletError::NotImplemented("mock".into()))
849 }
850 async fn get_header_for_height(&self, _height: u32) -> WalletResult<Vec<u8>> {
851 Err(WalletError::NotImplemented("mock".into()))
852 }
853 async fn get_height(&self) -> WalletResult<u32> {
854 Ok(800000)
855 }
856 async fn n_lock_time_is_final(
857 &self,
858 _input: crate::services::types::NLockTimeInput,
859 ) -> WalletResult<bool> {
860 Ok(true)
861 }
862 async fn get_bsv_exchange_rate(
863 &self,
864 ) -> WalletResult<crate::services::types::BsvExchangeRate> {
865 Err(WalletError::NotImplemented("mock".into()))
866 }
867 async fn get_fiat_exchange_rate(
868 &self,
869 _currency: &str,
870 _base: Option<&str>,
871 ) -> WalletResult<f64> {
872 Ok(1.0)
873 }
874 async fn get_fiat_exchange_rates(
875 &self,
876 _target: &[String],
877 ) -> WalletResult<crate::services::types::FiatExchangeRates> {
878 Err(WalletError::NotImplemented("mock".into()))
879 }
880 fn get_services_call_history(
881 &self,
882 _reset: bool,
883 ) -> crate::services::types::ServicesCallHistory {
884 crate::services::types::ServicesCallHistory { services: vec![] }
885 }
886 async fn get_beef_for_txid(&self, _txid: &str) -> WalletResult<bsv::transaction::Beef> {
887 Err(WalletError::NotImplemented("mock".into()))
888 }
889 fn hash_output_script(&self, _script: &[u8]) -> String {
890 String::new()
891 }
892 async fn is_utxo(
893 &self,
894 _locking_script: &[u8],
895 _txid: &str,
896 _vout: u32,
897 ) -> WalletResult<bool> {
898 Ok(false)
899 }
900 }
901
902 #[test]
906 fn test_monitor_builder_validates_required_fields() {
907 let result = MonitorBuilder::new().build();
909 assert!(result.is_err());
910 match result {
911 Err(e) => assert!(
912 e.to_string().contains("chain"),
913 "Expected chain error, got: {}",
914 e
915 ),
916 Ok(_) => panic!("Expected error for missing chain"),
917 }
918
919 let result = MonitorBuilder::new().chain(Chain::Test).build();
921 assert!(result.is_err());
922 match result {
923 Err(e) => assert!(
924 e.to_string().contains("storage"),
925 "Expected storage error, got: {}",
926 e
927 ),
928 Ok(_) => panic!("Expected error for missing storage"),
929 }
930 }
931
932 #[test]
933 fn test_time_constants() {
934 assert_eq!(ONE_SECOND, 1000);
935 assert_eq!(ONE_MINUTE, 60_000);
936 assert_eq!(ONE_HOUR, 3_600_000);
937 assert_eq!(ONE_DAY, 86_400_000);
938 assert_eq!(ONE_WEEK, 604_800_000);
939 }
940
941 #[test]
942 fn test_default_purge_params() {
943 let params = default_purge_params();
944 assert!(!params.purge_spent);
945 assert!(!params.purge_completed);
946 assert!(params.purge_failed);
947 assert_eq!(params.purge_spent_age, 2 * ONE_WEEK);
948 assert_eq!(params.purge_completed_age, 2 * ONE_WEEK);
949 assert_eq!(params.purge_failed_age, 5 * ONE_DAY);
950 }
951
952 #[test]
953 fn test_deactivated_header() {
954 let header = BlockHeader {
955 version: 1,
956 previous_hash: "0000".to_string(),
957 merkle_root: "abcd".to_string(),
958 time: 1234567890,
959 bits: 0x1d00ffff,
960 nonce: 42,
961 height: 100,
962 hash: "blockhash".to_string(),
963 };
964 let dh = DeactivatedHeader {
965 when_msecs: 1000,
966 tries: 0,
967 header: header.clone(),
968 };
969 assert_eq!(dh.when_msecs, 1000);
970 assert_eq!(dh.tries, 0);
971 assert_eq!(dh.header.height, 100);
972 }
973
974 #[test]
975 fn test_now_msecs_returns_reasonable_value() {
976 let now = now_msecs();
977 assert!(now > 1_577_836_800_000);
979 }
980
981 #[tokio::test]
982 async fn test_process_reorg_adds_deactivated_headers() {
983 let queue: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
987 Arc::new(tokio::sync::Mutex::new(Vec::new()));
988
989 let header = BlockHeader {
990 version: 1,
991 previous_hash: "0000".to_string(),
992 merkle_root: "abcd".to_string(),
993 time: 1234567890,
994 bits: 0x1d00ffff,
995 nonce: 42,
996 height: 100,
997 hash: "blockhash".to_string(),
998 };
999
1000 {
1002 let mut q = queue.lock().await;
1003 q.push(DeactivatedHeader {
1004 when_msecs: now_msecs(),
1005 tries: 0,
1006 header: header.clone(),
1007 });
1008 }
1009
1010 let q = queue.lock().await;
1011 assert_eq!(q.len(), 1);
1012 assert_eq!(q[0].header.height, 100);
1013 assert_eq!(q[0].tries, 0);
1014 }
1015}