1use crate::{
2 errors::{IndexError, IndexResult},
3 IDENT,
4};
5use async_trait::async_trait;
6use kaspa_consensus_notify::{notification as consensus_notification, notification::Notification as ConsensusNotification};
7use kaspa_core::{debug, trace};
8use kaspa_index_core::notification::{Notification, PruningPointUtxoSetOverrideNotification, UtxosChangedNotification};
9use kaspa_notify::{
10 collector::{Collector, CollectorNotificationReceiver},
11 error::Result,
12 events::EventType,
13 notification::Notification as NotificationTrait,
14 notifier::DynNotify,
15};
16use kaspa_utils::triggers::SingleTrigger;
17use kaspa_utxoindex::api::UtxoIndexProxy;
18use std::sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc,
21};
22
23#[derive(Debug)]
29pub struct Processor {
30 utxoindex: Option<UtxoIndexProxy>,
32
33 recv_channel: CollectorNotificationReceiver<ConsensusNotification>,
34
35 is_started: Arc<AtomicBool>,
37
38 collect_shutdown: Arc<SingleTrigger>,
39}
40
41impl Processor {
42 pub fn new(utxoindex: Option<UtxoIndexProxy>, recv_channel: CollectorNotificationReceiver<ConsensusNotification>) -> Self {
43 Self {
44 utxoindex,
45 recv_channel,
46 collect_shutdown: Arc::new(SingleTrigger::new()),
47 is_started: Arc::new(AtomicBool::new(false)),
48 }
49 }
50
51 fn spawn_collecting_task(self: Arc<Self>, notifier: DynNotify<Notification>) {
52 if self.is_started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
54 return;
55 }
56 tokio::spawn(async move {
57 trace!("[Index processor] collecting task starting");
58
59 while let Ok(notification) = self.recv_channel.recv().await {
60 match self.process_notification(notification).await {
61 Ok(notification) => match notifier.notify(notification) {
62 Ok(_) => (),
63 Err(err) => {
64 trace!("[Index processor] notification sender error: {err:?}");
65 }
66 },
67 Err(err) => {
68 trace!("[Index processor] error while processing a consensus notification: {err:?}");
69 }
70 }
71 }
72
73 debug!("[Index processor] notification stream ended");
74 self.collect_shutdown.trigger.trigger();
75 trace!("[Index processor] collecting task ended");
76 });
77 }
78
79 async fn process_notification(self: &Arc<Self>, notification: ConsensusNotification) -> IndexResult<Notification> {
80 match notification {
81 ConsensusNotification::UtxosChanged(utxos_changed) => {
82 Ok(Notification::UtxosChanged(self.process_utxos_changed(utxos_changed).await?))
83 }
84 ConsensusNotification::PruningPointUtxoSetOverride(_) => {
85 Ok(Notification::PruningPointUtxoSetOverride(PruningPointUtxoSetOverrideNotification {}))
86 }
87 _ => Err(IndexError::NotSupported(notification.event_type())),
88 }
89 }
90
91 async fn process_utxos_changed(
92 self: &Arc<Self>,
93 notification: consensus_notification::UtxosChangedNotification,
94 ) -> IndexResult<UtxosChangedNotification> {
95 trace!("[{IDENT}]: processing {:?}", notification);
96 if let Some(utxoindex) = self.utxoindex.clone() {
97 let converted_notification: UtxosChangedNotification =
98 utxoindex.update(notification.accumulated_utxo_diff.clone(), notification.virtual_parents).await?.into();
99 debug!(
100 "IDXPRC, Creating UtxosChanged notifications with {} added and {} removed utxos",
101 converted_notification.added.len(),
102 converted_notification.removed.len()
103 );
104 return Ok(converted_notification);
105 };
106 Err(IndexError::NotSupported(EventType::UtxosChanged))
107 }
108
109 async fn join_collecting_task(&self) -> Result<()> {
110 trace!("[Index processor] joining");
111 self.collect_shutdown.listener.clone().await;
112 debug!("[Index processor] terminated");
113 Ok(())
114 }
115}
116
117#[async_trait]
118impl Collector<Notification> for Processor {
119 fn start(self: Arc<Self>, notifier: DynNotify<Notification>) {
120 self.spawn_collecting_task(notifier);
121 }
122
123 async fn join(self: Arc<Self>) -> Result<()> {
124 self.join_collecting_task().await
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use async_channel::{unbounded, Receiver, Sender};
132 use kaspa_consensus::{config::Config, consensus::test_consensus::TestConsensus, params::DEVNET_PARAMS, test_helpers::*};
133 use kaspa_consensus_core::utxo::{utxo_collection::UtxoCollection, utxo_diff::UtxoDiff};
134 use kaspa_consensusmanager::ConsensusManager;
135 use kaspa_database::create_temp_db;
136 use kaspa_database::prelude::ConnBuilder;
137 use kaspa_database::utils::DbLifetime;
138 use kaspa_notify::notifier::test_helpers::NotifyMock;
139 use kaspa_utxoindex::UtxoIndex;
140 use rand::{rngs::SmallRng, SeedableRng};
141 use std::sync::Arc;
142
143 #[allow(dead_code)]
146 struct NotifyPipeline {
147 consensus_sender: Sender<ConsensusNotification>,
148 processor: Arc<Processor>,
149 processor_receiver: Receiver<Notification>,
150 test_consensus: TestConsensus,
151 utxoindex_db_lifetime: DbLifetime,
152 }
153
154 impl NotifyPipeline {
155 fn new() -> Self {
156 let (consensus_sender, consensus_receiver) = unbounded();
157 let (utxoindex_db_lifetime, utxoindex_db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
158 let config = Arc::new(Config::new(DEVNET_PARAMS));
159 let tc = TestConsensus::new(&config);
160 tc.init();
161 let consensus_manager = Arc::new(ConsensusManager::from_consensus(tc.consensus_clone()));
162 let utxoindex = Some(UtxoIndexProxy::new(UtxoIndex::new(consensus_manager, utxoindex_db).unwrap()));
163 let processor = Arc::new(Processor::new(utxoindex, consensus_receiver));
164 let (processor_sender, processor_receiver) = unbounded();
165 let notifier = Arc::new(NotifyMock::new(processor_sender));
166 processor.clone().start(notifier);
167 Self { test_consensus: tc, consensus_sender, processor, processor_receiver, utxoindex_db_lifetime }
168 }
169 }
170
171 #[tokio::test]
172 async fn test_utxos_changed_notification() {
173 let pipeline = NotifyPipeline::new();
174 let rng = &mut SmallRng::seed_from_u64(42);
175
176 let mut to_add_collection = UtxoCollection::new();
177 let mut to_remove_collection = UtxoCollection::new();
178 for _ in 0..2 {
179 to_add_collection.insert(generate_random_outpoint(rng), generate_random_utxo(rng));
180 to_remove_collection.insert(generate_random_outpoint(rng), generate_random_utxo(rng));
181 }
182
183 let test_notification = consensus_notification::UtxosChangedNotification::new(
184 Arc::new(UtxoDiff { add: to_add_collection, remove: to_remove_collection }),
185 Arc::new(generate_random_hashes(rng, 2)),
186 );
187
188 pipeline.consensus_sender.send(ConsensusNotification::UtxosChanged(test_notification.clone())).await.expect("expected send");
189
190 match pipeline.processor_receiver.recv().await.expect("receives a notification") {
191 Notification::UtxosChanged(utxo_changed_notification) => {
192 let mut notification_utxo_added_count = 0;
193 for (script_public_key, compact_utxo_collection) in utxo_changed_notification.added.iter() {
194 for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
195 let test_utxo = test_notification
196 .accumulated_utxo_diff
197 .add
198 .get(transaction_outpoint)
199 .expect("expected transaction outpoint to be in test event");
200 assert_eq!(test_utxo.script_public_key, *script_public_key);
201 assert_eq!(test_utxo.amount, compact_utxo.amount);
202 assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
203 assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
204 notification_utxo_added_count += 1;
205 }
206 }
207 assert_eq!(test_notification.accumulated_utxo_diff.add.len(), notification_utxo_added_count);
208
209 let mut notification_utxo_removed_count = 0;
210 for (script_public_key, compact_utxo_collection) in utxo_changed_notification.removed.iter() {
211 for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
212 let test_utxo = test_notification
213 .accumulated_utxo_diff
214 .remove
215 .get(transaction_outpoint)
216 .expect("expected transaction outpoint to be in test event");
217 assert_eq!(test_utxo.script_public_key, *script_public_key);
218 assert_eq!(test_utxo.amount, compact_utxo.amount);
219 assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
220 assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
221 notification_utxo_removed_count += 1;
222 }
223 }
224 assert_eq!(test_notification.accumulated_utxo_diff.remove.len(), notification_utxo_removed_count);
225 }
226 unexpected_notification => panic!("Unexpected notification: {unexpected_notification:?}"),
227 }
228 assert!(pipeline.processor_receiver.is_empty(), "the notification receiver should be empty");
229 pipeline.consensus_sender.close();
230 pipeline.processor.clone().join().await.expect("stopping the processor must succeed");
231 }
232
233 #[tokio::test]
234 async fn test_pruning_point_utxo_set_override_notification() {
235 let pipeline = NotifyPipeline::new();
236 let test_notification = consensus_notification::PruningPointUtxoSetOverrideNotification {};
237 pipeline
238 .consensus_sender
239 .send(ConsensusNotification::PruningPointUtxoSetOverride(test_notification.clone()))
240 .await
241 .expect("expected send");
242 match pipeline.processor_receiver.recv().await.expect("expected recv") {
243 Notification::PruningPointUtxoSetOverride(_) => (),
244 unexpected_notification => panic!("Unexpected notification: {unexpected_notification:?}"),
245 }
246 assert!(pipeline.processor_receiver.is_empty(), "the notification receiver should be empty");
247 pipeline.consensus_sender.close();
248 pipeline.processor.clone().join().await.expect("stopping the processor must succeed");
249 }
250}