kaspa_index_processor/
processor.rs

1use crate::{
2    errors::{IndexError, IndexResult},
3    IDENT,
4};
5use async_trait::async_trait;
6use kaspa_consensus_notify::{notification as consensus_notification, notification::Notification as ConsensusNotification};
7use kaspa_core::{debug, trace};
8use kaspa_index_core::notification::{Notification, PruningPointUtxoSetOverrideNotification, UtxosChangedNotification};
9use kaspa_notify::{
10    collector::{Collector, CollectorNotificationReceiver},
11    error::Result,
12    events::EventType,
13    notification::Notification as NotificationTrait,
14    notifier::DynNotify,
15};
16use kaspa_utils::triggers::SingleTrigger;
17use kaspa_utxoindex::api::UtxoIndexProxy;
18use std::sync::{
19    atomic::{AtomicBool, Ordering},
20    Arc,
21};
22
23/// Processor processes incoming consensus UtxosChanged and PruningPointUtxoSetOverride
24/// notifications submitting them to a UtxoIndex.
25///
26/// It also acts as a [`Collector`], converting the incoming consensus notifications
27/// into their pending local versions and relaying them to a local notifier.
28#[derive(Debug)]
29pub struct Processor {
30    /// An optional UTXO indexer
31    utxoindex: Option<UtxoIndexProxy>,
32
33    recv_channel: CollectorNotificationReceiver<ConsensusNotification>,
34
35    /// Has this collector been started?
36    is_started: Arc<AtomicBool>,
37
38    collect_shutdown: Arc<SingleTrigger>,
39}
40
41impl Processor {
42    pub fn new(utxoindex: Option<UtxoIndexProxy>, recv_channel: CollectorNotificationReceiver<ConsensusNotification>) -> Self {
43        Self {
44            utxoindex,
45            recv_channel,
46            collect_shutdown: Arc::new(SingleTrigger::new()),
47            is_started: Arc::new(AtomicBool::new(false)),
48        }
49    }
50
51    fn spawn_collecting_task(self: Arc<Self>, notifier: DynNotify<Notification>) {
52        // The task can only be spawned once
53        if self.is_started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
54            return;
55        }
56        tokio::spawn(async move {
57            trace!("[Index processor] collecting task starting");
58
59            while let Ok(notification) = self.recv_channel.recv().await {
60                match self.process_notification(notification).await {
61                    Ok(notification) => match notifier.notify(notification) {
62                        Ok(_) => (),
63                        Err(err) => {
64                            trace!("[Index processor] notification sender error: {err:?}");
65                        }
66                    },
67                    Err(err) => {
68                        trace!("[Index processor] error while processing a consensus notification: {err:?}");
69                    }
70                }
71            }
72
73            debug!("[Index processor] notification stream ended");
74            self.collect_shutdown.trigger.trigger();
75            trace!("[Index processor] collecting task ended");
76        });
77    }
78
79    async fn process_notification(self: &Arc<Self>, notification: ConsensusNotification) -> IndexResult<Notification> {
80        match notification {
81            ConsensusNotification::UtxosChanged(utxos_changed) => {
82                Ok(Notification::UtxosChanged(self.process_utxos_changed(utxos_changed).await?))
83            }
84            ConsensusNotification::PruningPointUtxoSetOverride(_) => {
85                Ok(Notification::PruningPointUtxoSetOverride(PruningPointUtxoSetOverrideNotification {}))
86            }
87            _ => Err(IndexError::NotSupported(notification.event_type())),
88        }
89    }
90
91    async fn process_utxos_changed(
92        self: &Arc<Self>,
93        notification: consensus_notification::UtxosChangedNotification,
94    ) -> IndexResult<UtxosChangedNotification> {
95        trace!("[{IDENT}]: processing {:?}", notification);
96        if let Some(utxoindex) = self.utxoindex.clone() {
97            let converted_notification: UtxosChangedNotification =
98                utxoindex.update(notification.accumulated_utxo_diff.clone(), notification.virtual_parents).await?.into();
99            debug!(
100                "IDXPRC, Creating UtxosChanged notifications with {} added and {} removed utxos",
101                converted_notification.added.len(),
102                converted_notification.removed.len()
103            );
104            return Ok(converted_notification);
105        };
106        Err(IndexError::NotSupported(EventType::UtxosChanged))
107    }
108
109    async fn join_collecting_task(&self) -> Result<()> {
110        trace!("[Index processor] joining");
111        self.collect_shutdown.listener.clone().await;
112        debug!("[Index processor] terminated");
113        Ok(())
114    }
115}
116
117#[async_trait]
118impl Collector<Notification> for Processor {
119    fn start(self: Arc<Self>, notifier: DynNotify<Notification>) {
120        self.spawn_collecting_task(notifier);
121    }
122
123    async fn join(self: Arc<Self>) -> Result<()> {
124        self.join_collecting_task().await
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use async_channel::{unbounded, Receiver, Sender};
132    use kaspa_consensus::{config::Config, consensus::test_consensus::TestConsensus, params::DEVNET_PARAMS, test_helpers::*};
133    use kaspa_consensus_core::utxo::{utxo_collection::UtxoCollection, utxo_diff::UtxoDiff};
134    use kaspa_consensusmanager::ConsensusManager;
135    use kaspa_database::create_temp_db;
136    use kaspa_database::prelude::ConnBuilder;
137    use kaspa_database::utils::DbLifetime;
138    use kaspa_notify::notifier::test_helpers::NotifyMock;
139    use kaspa_utxoindex::UtxoIndex;
140    use rand::{rngs::SmallRng, SeedableRng};
141    use std::sync::Arc;
142
143    // TODO: rewrite with Simnet, when possible.
144
145    #[allow(dead_code)]
146    struct NotifyPipeline {
147        consensus_sender: Sender<ConsensusNotification>,
148        processor: Arc<Processor>,
149        processor_receiver: Receiver<Notification>,
150        test_consensus: TestConsensus,
151        utxoindex_db_lifetime: DbLifetime,
152    }
153
154    impl NotifyPipeline {
155        fn new() -> Self {
156            let (consensus_sender, consensus_receiver) = unbounded();
157            let (utxoindex_db_lifetime, utxoindex_db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
158            let config = Arc::new(Config::new(DEVNET_PARAMS));
159            let tc = TestConsensus::new(&config);
160            tc.init();
161            let consensus_manager = Arc::new(ConsensusManager::from_consensus(tc.consensus_clone()));
162            let utxoindex = Some(UtxoIndexProxy::new(UtxoIndex::new(consensus_manager, utxoindex_db).unwrap()));
163            let processor = Arc::new(Processor::new(utxoindex, consensus_receiver));
164            let (processor_sender, processor_receiver) = unbounded();
165            let notifier = Arc::new(NotifyMock::new(processor_sender));
166            processor.clone().start(notifier);
167            Self { test_consensus: tc, consensus_sender, processor, processor_receiver, utxoindex_db_lifetime }
168        }
169    }
170
171    #[tokio::test]
172    async fn test_utxos_changed_notification() {
173        let pipeline = NotifyPipeline::new();
174        let rng = &mut SmallRng::seed_from_u64(42);
175
176        let mut to_add_collection = UtxoCollection::new();
177        let mut to_remove_collection = UtxoCollection::new();
178        for _ in 0..2 {
179            to_add_collection.insert(generate_random_outpoint(rng), generate_random_utxo(rng));
180            to_remove_collection.insert(generate_random_outpoint(rng), generate_random_utxo(rng));
181        }
182
183        let test_notification = consensus_notification::UtxosChangedNotification::new(
184            Arc::new(UtxoDiff { add: to_add_collection, remove: to_remove_collection }),
185            Arc::new(generate_random_hashes(rng, 2)),
186        );
187
188        pipeline.consensus_sender.send(ConsensusNotification::UtxosChanged(test_notification.clone())).await.expect("expected send");
189
190        match pipeline.processor_receiver.recv().await.expect("receives a notification") {
191            Notification::UtxosChanged(utxo_changed_notification) => {
192                let mut notification_utxo_added_count = 0;
193                for (script_public_key, compact_utxo_collection) in utxo_changed_notification.added.iter() {
194                    for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
195                        let test_utxo = test_notification
196                            .accumulated_utxo_diff
197                            .add
198                            .get(transaction_outpoint)
199                            .expect("expected transaction outpoint to be in test event");
200                        assert_eq!(test_utxo.script_public_key, *script_public_key);
201                        assert_eq!(test_utxo.amount, compact_utxo.amount);
202                        assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
203                        assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
204                        notification_utxo_added_count += 1;
205                    }
206                }
207                assert_eq!(test_notification.accumulated_utxo_diff.add.len(), notification_utxo_added_count);
208
209                let mut notification_utxo_removed_count = 0;
210                for (script_public_key, compact_utxo_collection) in utxo_changed_notification.removed.iter() {
211                    for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
212                        let test_utxo = test_notification
213                            .accumulated_utxo_diff
214                            .remove
215                            .get(transaction_outpoint)
216                            .expect("expected transaction outpoint to be in test event");
217                        assert_eq!(test_utxo.script_public_key, *script_public_key);
218                        assert_eq!(test_utxo.amount, compact_utxo.amount);
219                        assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
220                        assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
221                        notification_utxo_removed_count += 1;
222                    }
223                }
224                assert_eq!(test_notification.accumulated_utxo_diff.remove.len(), notification_utxo_removed_count);
225            }
226            unexpected_notification => panic!("Unexpected notification: {unexpected_notification:?}"),
227        }
228        assert!(pipeline.processor_receiver.is_empty(), "the notification receiver should be empty");
229        pipeline.consensus_sender.close();
230        pipeline.processor.clone().join().await.expect("stopping the processor must succeed");
231    }
232
233    #[tokio::test]
234    async fn test_pruning_point_utxo_set_override_notification() {
235        let pipeline = NotifyPipeline::new();
236        let test_notification = consensus_notification::PruningPointUtxoSetOverrideNotification {};
237        pipeline
238            .consensus_sender
239            .send(ConsensusNotification::PruningPointUtxoSetOverride(test_notification.clone()))
240            .await
241            .expect("expected send");
242        match pipeline.processor_receiver.recv().await.expect("expected recv") {
243            Notification::PruningPointUtxoSetOverride(_) => (),
244            unexpected_notification => panic!("Unexpected notification: {unexpected_notification:?}"),
245        }
246        assert!(pipeline.processor_receiver.is_empty(), "the notification receiver should be empty");
247        pipeline.consensus_sender.close();
248        pipeline.processor.clone().join().await.expect("stopping the processor must succeed");
249    }
250}